This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 8afb30787d [#10381][#10378] fix(clickhouse-catalog):  Fix ClickHouse 
alter-table bugs (#10383)
8afb30787d is described below

commit 8afb30787d152fb1c577202d9ddfe1b0e919d93b
Author: Qi Yu <[email protected]>
AuthorDate: Fri Mar 20 18:05:10 2026 +0800

    [#10381][#10378] fix(clickhouse-catalog):  Fix ClickHouse alter-table bugs 
(#10383)
    
    ### What changes were proposed in this pull request?
    
    Add comprehensive integration coverage for ClickHouse alter-table
    behavior in:
    - `CatalogClickHouseIT`
    - `CatalogClickHouseClusterIT`
    
    The new tests exercise `ClickHouseTableOperations#generateAlterTableSql`
    branches in real ClickHouse environments, including:
    - supported alter branches: add/update type/update default/update
    comment/update position/update nullability/delete index/delete column
    - edge/no-op branches: delete missing column/index with `ifExists=true`
    - unsupported branches: set/remove property, update auto increment,
    nested column add
    
    Tests also verify resulting table metadata (column
    type/comment/nullability/index removal) after alter operations.
    
    ### Why are the changes needed?
    
    To prevent regressions in ClickHouse ALTER TABLE SQL generation and
    close branch-level test gaps for issues discussed in #10381 and #10378.
    
    Fixed: #10381
    Fixed: #10372
    Fixed: #10396
    Fixed: #10405
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A
    
    
    Closes #10381
    Closes #10378
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 .../operations/ClickHouseClusterUtils.java         | 169 +++++++++
 .../operations/ClickHouseDatabaseOperations.java   | 129 ++++++-
 .../operations/ClickHouseTableOperations.java      | 148 ++++++--
 .../test/CatalogClickHouseClusterIT.java           | 395 ++++++++++++++++++++-
 .../integration/test/CatalogClickHouseIT.java      | 256 ++++++++++++-
 .../TestClickHouseDatabaseOperations.java          | 142 ++++++--
 .../operations/TestClickHouseEngineIT.java         | 360 +++++++++++++++++++
 .../operations/TestClickHouseTableOperations.java  | 131 ++++++-
 .../TestClickHouseTableOperationsCluster.java      | 145 ++++++++
 .../catalog/jdbc/JdbcCatalogOperations.java        |   2 +
 docs/jdbc-clickhouse-catalog.md                    |  42 ++-
 .../integration/test/util/TestDatabaseName.java    |   1 +
 12 files changed, 1849 insertions(+), 71 deletions(-)

diff --git 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseClusterUtils.java
 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseClusterUtils.java
new file mode 100644
index 0000000000..830c032f70
--- /dev/null
+++ 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseClusterUtils.java
@@ -0,0 +1,169 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.gravitino.catalog.clickhouse.operations;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Utilities for embedding and extracting ClickHouse cluster metadata in 
object COMMENT fields.
+ *
+ * <p><b>Why COMMENT?</b>
+ *
+ * <p>ClickHouse does not persist {@code ON CLUSTER} information in any 
queryable system table for
+ * non-Replicated objects:
+ *
+ * <ul>
+ *   <li>{@code SHOW CREATE DATABASE} omits the {@code ON CLUSTER} clause.
+ *   <li>{@code SHOW CREATE TABLE} omits the {@code ON CLUSTER} clause (each 
node stores the local
+ *       DDL without the distribution directive).
+ *   <li>{@code system.databases.cluster} is only populated for {@code 
Replicated}-engine databases.
+ * </ul>
+ *
+ * <p>Gravitino therefore embeds the cluster name inside the object's COMMENT 
field at creation
+ * time, separated from the user comment by a newline. The metadata line is 
always stripped before
+ * surfacing the comment to callers.
+ *
+ * <p><b>Stored format:</b>
+ *
+ * <pre>userComment
+ * [Gravitino] ch.cluster=clusterName</pre>
+ *
+ * <p><b>Example — database created ON CLUSTER:</b>
+ *
+ * <pre>
+ * -- Gravitino issues:
+ * CREATE DATABASE `my_db` ON CLUSTER `ck_cluster`
+ *   COMMENT 'my comment\n[Gravitino] ch.cluster=ck_cluster'
+ *
+ * -- ClickHouse SHOW CREATE DATABASE output (ON CLUSTER is omitted):
+ * CREATE DATABASE my_db
+ * ENGINE = Atomic
+ * COMMENT 'my comment
+ * [Gravitino] ch.cluster=ck_cluster'
+ *
+ * -- Gravitino reads back from system.databases.comment:
+ * --   stored : 'my comment\n[Gravitino] ch.cluster=ck_cluster'
+ * --   cluster: 'ck_cluster'   (extracted by extractClusterFromComment)
+ * --   user   : 'my comment'   (stripped by stripClusterMetadata)
+ * </pre>
+ *
+ * <p><b>Example — table created ON CLUSTER:</b>
+ *
+ * <pre>
+ * -- Gravitino issues:
+ * CREATE TABLE `orders` ON CLUSTER `ck_cluster`
+ *   (`id` Int32, `amount` Decimal(10,2))
+ *   ENGINE = MergeTree()
+ *   ORDER BY `id`
+ *   COMMENT 'order records\n[Gravitino] ch.cluster=ck_cluster'
+ *
+ * -- ClickHouse SHOW CREATE TABLE output (ON CLUSTER is omitted):
+ * CREATE TABLE default.orders
+ * (
+ *   `id` Int32,
+ *   `amount` Decimal(10, 2)
+ * )
+ * ENGINE = MergeTree
+ * ORDER BY id
+ * COMMENT 'order records
+ * [Gravitino] ch.cluster=ck_cluster'
+ *
+ * -- Gravitino reads back from system.tables.comment:
+ * --   stored : 'order records\n[Gravitino] ch.cluster=ck_cluster'
+ * --   cluster: 'ck_cluster'      (extracted by extractClusterFromComment)
+ * --   user   : 'order records'   (stripped by stripClusterMetadata)
+ * </pre>
+ *
+ * <p><b>Limitation:</b> This mechanism only works for databases and tables 
created through
+ * Gravitino. If a database or table was created directly in ClickHouse 
(bypassing Gravitino),
+ * Gravitino has no way to determine whether it was created {@code ON CLUSTER} 
or which cluster name
+ * was used. In that case {@link #extractClusterFromComment} returns {@code 
null} and the {@code
+ * on-cluster} / {@code cluster-name} properties reported by Gravitino will be 
absent or inaccurate.
+ */
+public final class ClickHouseClusterUtils {
+
+  /**
+   * Prefix that marks the start of the embedded cluster metadata line. The 
{@code [Gravitino]}
+   * attribution tells anyone reading the raw comment (e.g. via {@code SHOW 
CREATE TABLE}) that this
+   * line was written by Gravitino and should not be edited manually. The 
leading newline keeps the
+   * metadata on its own line for readability.
+   */
+  @VisibleForTesting public static final String CLUSTER_META_PREFIX = 
"\n[Gravitino] ch.cluster=";
+
+  private ClickHouseClusterUtils() {}
+
+  /**
+   * Appends cluster metadata to {@code comment}, producing the string that 
will be stored in
+   * ClickHouse's {@code COMMENT} field.
+   *
+   * @param comment The user-visible comment (may be {@code null} or the 
Gravitino-encoded comment
+   *     string).
+   * @param clusterName The cluster name to embed.
+   * @return The combined string: {@code comment\n[Gravitino] 
ch.cluster=clusterName}.
+   */
+  public static String embedClusterInComment(String comment, String 
clusterName) {
+    return StringUtils.defaultString(comment) + CLUSTER_META_PREFIX + 
clusterName;
+  }
+
+  /**
+   * Extracts the cluster name embedded by {@link #embedClusterInComment}, or 
{@code null} if none
+   * is present (e.g., the object was not created {@code ON CLUSTER} through 
Gravitino, or was
+   * created by a third-party tool).
+   *
+   * @param storedComment The raw comment as stored in ClickHouse.
+   * @return The cluster name, or {@code null}.
+   */
+  public static String extractClusterFromComment(String storedComment) {
+    if (storedComment == null) {
+      return null;
+    }
+    int idx = storedComment.indexOf(CLUSTER_META_PREFIX);
+    if (idx < 0) {
+      return null;
+    }
+    return storedComment.substring(idx + CLUSTER_META_PREFIX.length());
+  }
+
+  /**
+   * Returns the user-visible portion of the stored comment, stripping any 
embedded cluster metadata
+   * suffix. Returns {@code null} if {@code storedComment} is {@code null}.
+   *
+   * @param storedComment The raw comment as stored in ClickHouse.
+   * @return The comment without cluster metadata.
+   */
+  public static String stripClusterMetadata(String storedComment) {
+    if (storedComment == null) {
+      return null;
+    }
+    int idx = storedComment.indexOf(CLUSTER_META_PREFIX);
+    return idx < 0 ? storedComment : storedComment.substring(0, idx);
+  }
+
+  /**
+   * Escapes single-quote characters in {@code text} for use inside a 
ClickHouse SQL string literal
+   * delimited by single quotes (i.e. replaces each {@code '} with {@code ''}).
+   *
+   * @param text the raw string; must not be {@code null}.
+   * @return the escaped string.
+   */
+  public static String escapeSingleQuotes(String text) {
+    return text.replace("'", "''");
+  }
+}
diff --git 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseDatabaseOperations.java
 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseDatabaseOperations.java
index 7ae9a76de8..0d4ba9b7ea 100644
--- 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseDatabaseOperations.java
+++ 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseDatabaseOperations.java
@@ -18,8 +18,16 @@
  */
 package org.apache.gravitino.catalog.clickhouse.operations;
 
+import static 
org.apache.gravitino.catalog.clickhouse.operations.ClickHouseClusterUtils.embedClusterInComment;
+import static 
org.apache.gravitino.catalog.clickhouse.operations.ClickHouseClusterUtils.escapeSingleQuotes;
+import static 
org.apache.gravitino.catalog.clickhouse.operations.ClickHouseClusterUtils.extractClusterFromComment;
+import static 
org.apache.gravitino.catalog.clickhouse.operations.ClickHouseClusterUtils.stripClusterMetadata;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -31,17 +39,23 @@ import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.StringIdentifier;
 import 
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.ClusterConstants;
+import org.apache.gravitino.catalog.jdbc.JdbcSchema;
 import org.apache.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations;
 import org.apache.gravitino.catalog.jdbc.utils.JdbcConnectorUtils;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.meta.AuditInfo;
 
 public class ClickHouseDatabaseOperations extends JdbcDatabaseOperations {
 
   private static final Set<String> CLICK_HOUSE_SYSTEM_DATABASES =
       ImmutableSet.of("information_schema", "default", "system", 
"INFORMATION_SCHEMA");
 
-  // TODO: handle ClickHouse cluster properly when creating/dropping 
databases/tables
-  //  use https://github.com/apache/gravitino/issues/9820 to track it.
-
+  /**
+   * Background: ClickHouse's {@code SHOW CREATE DATABASE} does not include 
{@code ON CLUSTER} in
+   * its output for non-Replicated (Atomic) databases, and {@code 
system.databases.cluster} is only
+   * populated for {@code Replicated} engine databases. Gravitino therefore 
embeds the cluster name
+   * in the COMMENT field (see {@link ClickHouseClusterUtils}) so it can be 
retrieved at DROP time.
+   */
   @Override
   protected boolean supportSchemaComment() {
     return true;
@@ -91,10 +105,12 @@ public class ClickHouseDatabaseOperations extends 
JdbcDatabaseOperations {
     if (onCluster(properties)) {
       String clusterName = properties.get(ClusterConstants.CLUSTER_NAME);
       createDatabaseSql.append(String.format(" ON CLUSTER `%s`", clusterName));
-    }
-
-    if (StringUtils.isNotEmpty(originComment)) {
-      createDatabaseSql.append(String.format(" COMMENT '%s'", originComment));
+      // Embed the cluster name into the COMMENT so it can be retrieved later 
(e.g., at DROP time).
+      // ClickHouse does not persist ON CLUSTER info in SHOW CREATE DATABASE 
for Atomic databases.
+      String storedComment = embedClusterInComment(originComment, clusterName);
+      createDatabaseSql.append(String.format(" COMMENT '%s'", 
escapeSingleQuotes(storedComment)));
+    } else if (StringUtils.isNotEmpty(originComment)) {
+      createDatabaseSql.append(String.format(" COMMENT '%s'", 
escapeSingleQuotes(originComment)));
     }
 
     LOG.info("Generated create database:{} sql: {}", databaseName, 
createDatabaseSql);
@@ -120,16 +136,113 @@ public class ClickHouseDatabaseOperations extends 
JdbcDatabaseOperations {
     }
   }
 
+  @Override
+  public JdbcSchema load(String databaseName) throws NoSuchSchemaException {
+    try (Connection connection = getConnection()) {
+      connection.setCatalog(createSysDatabaseNameSet().iterator().next());
+      try (PreparedStatement stmt =
+          connection.prepareStatement(
+              "SELECT name, comment FROM system.databases WHERE name = ?")) {
+        stmt.setString(1, databaseName);
+        try (ResultSet rs = stmt.executeQuery()) {
+          if (!rs.next()) {
+            throw new NoSuchSchemaException("Database %s could not be found", 
databaseName);
+          }
+          String storedComment = rs.getString("comment");
+          String clusterName = extractClusterFromComment(storedComment);
+          String userComment = stripClusterMetadata(storedComment);
+
+          ImmutableMap.Builder<String, String> propsBuilder = 
ImmutableMap.builder();
+          if (StringUtils.isNotBlank(clusterName)) {
+            propsBuilder.put(ClusterConstants.ON_CLUSTER, 
String.valueOf(true));
+            propsBuilder.put(ClusterConstants.CLUSTER_NAME, clusterName);
+          }
+
+          return JdbcSchema.builder()
+              .withName(databaseName)
+              .withComment(userComment)
+              .withProperties(propsBuilder.build())
+              .withAuditInfo(AuditInfo.EMPTY)
+              .build();
+        }
+      }
+    } catch (final SQLException se) {
+      throw this.exceptionMapper.toGravitinoException(se);
+    }
+  }
+
   @Override
   protected void dropDatabase(String databaseName, boolean cascade) {
     try (final Connection connection = getConnection()) {
       connection.setCatalog(createSysDatabaseNameSet().iterator().next());
-      JdbcConnectorUtils.executeUpdate(connection, 
generateDropDatabaseSql(databaseName, cascade));
+      if (!cascade) {
+        // Query system.tables instead of SHOW TABLES IN so we get a proper 
parameterised query
+        // and avoid any edge cases with special characters in database names.
+
+        // Note that using system table will not resolve the problem: if the 
database does not exist
+        // in this host(e.g. in a cluster setup), the query will return empty 
result, and we will
+        // proceed to drop the database,
+        try (PreparedStatement checkStmt =
+            connection.prepareStatement("SELECT 1 FROM system.tables WHERE 
database = ? LIMIT 1")) {
+          checkStmt.setString(1, databaseName);
+          try (ResultSet rs = checkStmt.executeQuery()) {
+            if (rs.next()) {
+              throw new IllegalArgumentException(
+                  String.format(
+                      "Database %s is not empty, you can drop it with CASCADE 
option.",
+                      databaseName));
+            }
+          }
+        }
+      }
+      // Read the cluster name stored in the COMMENT at CREATE time.
+      // SHOW CREATE DATABASE does not include ON CLUSTER info for 
non-Replicated databases.
+      String clusterName = readClusterName(connection, databaseName);
+      JdbcConnectorUtils.executeUpdate(
+          connection, generateDropDatabaseSql(databaseName, clusterName));
     } catch (final SQLException se) {
       throw this.exceptionMapper.toGravitinoException(se);
     }
   }
 
+  /**
+   * Generates the SQL statement to drop a ClickHouse database. If {@code 
clusterName} is non-blank,
+   * the DROP statement includes {@code ON CLUSTER `clusterName` SYNC} to 
propagate the operation
+   * across all cluster nodes synchronously.
+   *
+   * @param databaseName The name of the database to drop.
+   * @param clusterName The cluster name read from the stored comment, or 
{@code null}/blank if not
+   *     a cluster database.
+   * @return The DROP DATABASE SQL statement.
+   */
+  @VisibleForTesting
+  String generateDropDatabaseSql(String databaseName, String clusterName) {
+    StringBuilder sql = new StringBuilder(String.format("DROP DATABASE `%s`", 
databaseName));
+    if (StringUtils.isNotBlank(clusterName)) {
+      sql.append(String.format(" ON CLUSTER `%s`", clusterName));
+    }
+    LOG.info("Generated drop database:{} sql: {}", databaseName, sql);
+    return sql.toString();
+  }
+
+  /**
+   * Reads the cluster name from {@code system.databases.comment} for the 
given database. Returns
+   * {@code null} if the database has no embedded cluster metadata (i.e., it 
was created without
+   * {@code ON CLUSTER}).
+   */
+  private String readClusterName(Connection connection, String databaseName) 
throws SQLException {
+    try (PreparedStatement stmt =
+        connection.prepareStatement("SELECT comment FROM system.databases 
WHERE name = ?")) {
+      stmt.setString(1, databaseName);
+      try (ResultSet rs = stmt.executeQuery()) {
+        if (rs.next()) {
+          return extractClusterFromComment(rs.getString("comment"));
+        }
+      }
+    }
+    return null;
+  }
+
   private boolean onCluster(Map<String, String> dbProperties) {
     if (MapUtils.isEmpty(dbProperties)) {
       return false;
diff --git 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java
 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java
index b0b566b9b8..4c453b7e7a 100644
--- 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java
+++ 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java
@@ -23,6 +23,7 @@ import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.IndexC
 import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.CLICKHOUSE_ENGINE_KEY;
 import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.ENGINE_PROPERTY_ENTRY;
 import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.GRAVITINO_ENGINE_KEY;
+import static 
org.apache.gravitino.catalog.clickhouse.operations.ClickHouseClusterUtils.escapeSingleQuotes;
 import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -81,11 +82,11 @@ public class ClickHouseTableOperations extends 
JdbcTableOperations {
   private static final String CLICKHOUSE_NOT_SUPPORT_NESTED_COLUMN_MSG =
       "Clickhouse does not support nested column names.";
   private static final Pattern ORDER_BY_PATTERN =
-      Pattern.compile("ORDER\\s+BY\\s+([^\\n]+)", Pattern.CASE_INSENSITIVE);
+      Pattern.compile(
+          
"(?is)\\bORDER\\s+BY\\s*(.+?)(?=\\bPARTITION\\s+BY\\b|\\bPRIMARY\\s+KEY\\b|\\bSAMPLE\\s+BY\\b|\\bTTL\\b|\\bSETTINGS\\b|\\bCOMMENT\\b|$)");
   private static final Pattern PARTITION_BY_PATTERN =
-      Pattern.compile("PARTITION\\s+BY\\s+([^\\n]+)", 
Pattern.CASE_INSENSITIVE);
-  private static final Pattern ON_CLUSTER_PATTERN =
-      Pattern.compile("(?i)\\bON\\s+CLUSTER\\s+`?([^`\\s(]+)`?");
+      Pattern.compile(
+          
"(?is)\\bPARTITION\\s+BY\\s*(.+?)(?=\\bORDER\\s+BY\\b|\\bPRIMARY\\s+KEY\\b|\\bSAMPLE\\s+BY\\b|\\bTTL\\b|\\bSETTINGS\\b|\\bCOMMENT\\b|$)");
   private static final Pattern DISTRIBUTED_ENGINE_PATTERN =
       Pattern.compile(
           "(?i)^Distributed\\(([^,]+),\\s*([^,]+),\\s*([^,]+),\\s*(.+)\\)$", 
Pattern.DOTALL);
@@ -165,8 +166,10 @@ public class ClickHouseTableOperations extends 
JdbcTableOperations {
     Map<String, String> notNullProperties =
         MapUtils.isNotEmpty(properties) ? properties : Collections.emptyMap();
 
-    // Add Create table clause
-    appendCreateTableClause(notNullProperties, sqlBuilder, tableName);
+    validateNoAutoIncrementColumns(columns);
+
+    // Add Create table clause; capture whether ON CLUSTER is in use
+    boolean onCluster = appendCreateTableClause(notNullProperties, sqlBuilder, 
tableName);
 
     // We still allow empty columns when the engine is distributed.
     if (columns.length > 0) {
@@ -186,10 +189,15 @@ public class ClickHouseTableOperations extends 
JdbcTableOperations {
 
     appendPartitionClause(partitioning, sqlBuilder, engine);
 
-    // Add table comment if specified
-    if (StringUtils.isNotEmpty(comment)) {
-      String escapedComment = comment.replace("'", "''");
-      sqlBuilder.append(" COMMENT '%s'".formatted(escapedComment));
+    // Add table comment; embed cluster name so it can be recovered at 
DROP/ALTER time.
+    // ClickHouse does not persist ON CLUSTER in SHOW CREATE TABLE (see 
ClickHouseClusterUtils).
+    String storedComment =
+        onCluster
+            ? ClickHouseClusterUtils.embedClusterInComment(
+                comment, notNullProperties.get(ClusterConstants.CLUSTER_NAME))
+            : comment;
+    if (StringUtils.isNotEmpty(storedComment)) {
+      sqlBuilder.append(" COMMENT 
'%s'".formatted(escapeSingleQuotes(storedComment)));
     }
 
     // Add setting clause if specified, clickhouse only supports predefine 
settings
@@ -427,6 +435,20 @@ public class ClickHouseTableOperations extends 
JdbcTableOperations {
     return quoteIdentifier(fieldName);
   }
 
+  private void validateNoAutoIncrementColumns(JdbcColumn[] columns) {
+    if (ArrayUtils.isEmpty(columns)) {
+      return;
+    }
+
+    for (JdbcColumn column : columns) {
+      if (column.autoIncrement()) {
+        throw new UnsupportedOperationException(
+            "ClickHouse does not support auto increment column: '%s' in CREATE 
TABLE"
+                .formatted(column.name()));
+      }
+    }
+  }
+
   private void buildColumnsDefinition(JdbcColumn[] columns, StringBuilder 
sqlBuilder) {
     if (ArrayUtils.isEmpty(columns)) {
       return;
@@ -531,14 +553,17 @@ public class ClickHouseTableOperations extends 
JdbcTableOperations {
             return Collections.unmodifiableMap(
                 new HashMap<String, String>() {
                   {
-                    put(COMMENT, resultSet.getString(COMMENT));
+                    // Extract cluster name embedded in the COMMENT at create 
time.
+                    // SHOW CREATE TABLE does not include ON CLUSTER (see 
ClickHouseClusterUtils).
+                    String storedComment = resultSet.getString(COMMENT);
+                    String clusterName =
+                        
ClickHouseClusterUtils.extractClusterFromComment(storedComment);
+                    put(COMMENT, 
ClickHouseClusterUtils.stripClusterMetadata(storedComment));
                     String engine = resultSet.getString(CLICKHOUSE_ENGINE_KEY);
                     put(GRAVITINO_ENGINE_KEY, engine);
-                    String createSql = parseShowCreateTableSql(connection, 
tableName);
-                    Matcher onClusterMatcher = 
ON_CLUSTER_PATTERN.matcher(createSql);
-                    if (onClusterMatcher.find()) {
+                    if (StringUtils.isNotBlank(clusterName)) {
                       put(ClusterConstants.ON_CLUSTER, String.valueOf(true));
-                      put(ClusterConstants.CLUSTER_NAME, 
unquote(onClusterMatcher.group(1)));
+                      put(ClusterConstants.CLUSTER_NAME, clusterName);
                     } else {
                       put(ClusterConstants.ON_CLUSTER, String.valueOf(false));
                     }
@@ -652,6 +677,43 @@ public class ClickHouseTableOperations extends 
JdbcTableOperations {
     return metaData.getTables(catalogName, schemaName, null, new String[] 
{"TABLE"});
   }
 
+  @Override
+  protected void dropTable(String databaseName, String tableName) {
+    LOG.info("Attempting to delete table {} from database {}", tableName, 
databaseName);
+    try (Connection connection = getConnection(databaseName)) {
+      Map<String, String> props = getTableProperties(connection, tableName);
+      JdbcConnectorUtils.executeUpdate(connection, 
generateDropTableSql(tableName, props));
+      LOG.info("Deleted table {} from database {}", tableName, databaseName);
+    } catch (final SQLException se) {
+      throw this.exceptionMapper.toGravitinoException(se);
+    }
+  }
+
+  /**
+   * Generates the SQL statement to drop a ClickHouse table. When the table 
was created with {@code
+   * ON CLUSTER}, the DROP statement includes {@code ON CLUSTER `clusterName` 
SYNC} so the operation
+   * is propagated to every cluster node synchronously.
+   *
+   * @param tableName The name of the table to drop.
+   * @param properties The table properties as returned by {@link 
#getTableProperties}; used to
+   *     determine whether the table is on a cluster.
+   * @return The DROP TABLE SQL statement.
+   */
+  @VisibleForTesting
+  String generateDropTableSql(String tableName, Map<String, String> 
properties) {
+    String clusterName = properties == null ? null : 
properties.get(ClusterConstants.CLUSTER_NAME);
+    boolean onCluster =
+        properties != null
+            && 
Boolean.parseBoolean(properties.getOrDefault(ClusterConstants.ON_CLUSTER, 
"false"));
+
+    if (onCluster && StringUtils.isNotBlank(clusterName)) {
+      return String.format(
+          "DROP TABLE %s ON CLUSTER %s SYNC",
+          quoteIdentifier(tableName), quoteIdentifier(clusterName));
+    }
+    return String.format("DROP TABLE %s", quoteIdentifier(tableName));
+  }
+
   @Override
   protected String generatePurgeTableSql(String tableName) {
     throw new UnsupportedOperationException(
@@ -719,6 +781,10 @@ public class ClickHouseTableOperations extends 
JdbcTableOperations {
             updateColumnNullabilityDefinition(
                 (TableChange.UpdateColumnNullability) change, lazyLoadTable));
 
+      } else if (change instanceof TableChange.AddIndex addIndex) {
+        lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
+        alterSql.add(addIndexDefinition(lazyLoadTable, addIndex));
+
       } else if (change instanceof TableChange.DeleteIndex) {
         lazyLoadTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
         alterSql.add(deleteIndexDefinition(lazyLoadTable, 
(TableChange.DeleteIndex) change));
@@ -737,16 +803,23 @@ public class ClickHouseTableOperations extends 
JdbcTableOperations {
     // Last modified comment
     if (null != updateComment) {
       String newComment = updateComment.getNewComment();
+      // Load the existing table once. We need it for two purposes:
+      //   1. Preserve the Gravitino StringIdentifier embedded in the old 
comment, so Gravitino can
+      //      still identify the table after the comment is changed.
+      //   2. Re-embed the cluster name so it is not lost. ClickHouse does not 
persist ON CLUSTER
+      //      in SHOW CREATE TABLE, so the cluster name lives only in the 
stored comment.
+      lazyLoadTable = getOrCreateTable(databaseName, tableName, lazyLoadTable);
       if (null == StringIdentifier.fromComment(newComment)) {
-        // Detect and add Gravitino id.
-        JdbcTable jdbcTable = getOrCreateTable(databaseName, tableName, 
lazyLoadTable);
-        StringIdentifier identifier = 
StringIdentifier.fromComment(jdbcTable.comment());
+        StringIdentifier identifier = 
StringIdentifier.fromComment(lazyLoadTable.comment());
         if (null != identifier) {
           newComment = StringIdentifier.addToComment(identifier, newComment);
         }
       }
-      String escapedComment = newComment.replace("'", "''");
-      alterSql.add(" MODIFY COMMENT '%s'".formatted(escapedComment));
+      String clusterName = 
lazyLoadTable.properties().get(ClusterConstants.CLUSTER_NAME);
+      if (StringUtils.isNotBlank(clusterName)) {
+        newComment = ClickHouseClusterUtils.embedClusterInComment(newComment, 
clusterName);
+      }
+      alterSql.add(" MODIFY COMMENT 
'%s'".formatted(escapeSingleQuotes(newComment)));
     }
 
     if (!setProperties.isEmpty()) {
@@ -768,6 +841,36 @@ public class ClickHouseTableOperations extends 
JdbcTableOperations {
     return result;
   }
 
+  private String addIndexDefinition(JdbcTable table, TableChange.AddIndex 
addIndex) {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(addIndex.getName()), "Index name is required");
+    Preconditions.checkArgument(
+        ArrayUtils.isNotEmpty(addIndex.getFieldNames()), "Index field names 
are required");
+
+    boolean indexExists =
+        Arrays.stream(table.index()).anyMatch(index -> 
index.name().equals(addIndex.getName()));
+    Preconditions.checkArgument(!indexExists, "Index '%s' already exists", 
addIndex.getName());
+
+    String fieldStr = getIndexFieldStr(addIndex.getFieldNames());
+    switch (addIndex.getType()) {
+      case DATA_SKIPPING_MINMAX:
+        return "ADD INDEX %s %s TYPE minmax GRANULARITY 1"
+            .formatted(quoteIdentifier(addIndex.getName()), fieldStr);
+
+      case DATA_SKIPPING_BLOOM_FILTER:
+        return "ADD INDEX %s %s TYPE bloom_filter GRANULARITY 3"
+            .formatted(quoteIdentifier(addIndex.getName()), fieldStr);
+
+      case PRIMARY_KEY:
+        throw new UnsupportedOperationException(
+            "ClickHouse does not support adding primary key via ALTER TABLE");
+
+      default:
+        throw new IllegalArgumentException(
+            "Gravitino ClickHouse doesn't support index : " + 
addIndex.getType());
+    }
+  }
+
   @VisibleForTesting
   private String deleteIndexDefinition(
       JdbcTable lazyLoadTable, TableChange.DeleteIndex deleteIndex) {
@@ -1017,6 +1120,11 @@ public class ClickHouseTableOperations extends 
JdbcTableOperations {
     return metadata;
   }
 
+  @VisibleForTesting
+  SortOrder[] parseSortOrdersFromCreateSql(String createSql) {
+    return parseCreateStatement(createSql).sortOrders;
+  }
+
   private ShowCreateTableMetadata parseShowCreateTable(Connection connection, 
String tableName)
       throws SQLException {
     String createSql = parseShowCreateTableSql(connection, tableName);
diff --git 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/integration/test/CatalogClickHouseClusterIT.java
 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/integration/test/CatalogClickHouseClusterIT.java
index e80f8d4ea5..7e7187277a 100644
--- 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/integration/test/CatalogClickHouseClusterIT.java
+++ 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/integration/test/CatalogClickHouseClusterIT.java
@@ -33,9 +33,11 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.Statement;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
@@ -52,11 +54,15 @@ import 
org.apache.gravitino.integration.test.util.TestDatabaseName;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.NamedReference;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
 import org.apache.gravitino.rel.expressions.distributions.Distributions;
 import org.apache.gravitino.rel.expressions.literals.Literals;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
 import org.apache.gravitino.rel.types.Types;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
@@ -221,8 +227,14 @@ public class CatalogClickHouseClusterIT extends BaseIT {
     Table loadedLocalTable = tableCatalog.loadTable(localTableIdent);
     Assertions.assertEquals(
         ENGINE.MERGETREE.getValue(), 
loadedLocalTable.properties().get(GRAVITINO_ENGINE_KEY));
-    Assertions.assertEquals("false", 
loadedLocalTable.properties().get(ON_CLUSTER));
-    
Assertions.assertFalse(loadedLocalTable.properties().containsKey(CLUSTER_NAME));
+    // Cluster metadata is now embedded in COMMENT at create time and 
recovered on load.
+    Assertions.assertEquals("true", 
loadedLocalTable.properties().get(ON_CLUSTER));
+    Assertions.assertEquals(
+        ClickHouseContainer.DEFAULT_CLUSTER_NAME, 
loadedLocalTable.properties().get(CLUSTER_NAME));
+    Assertions.assertEquals(1, loadedLocalTable.sortOrder().length);
+    Assertions.assertTrue(loadedLocalTable.sortOrder()[0].expression() 
instanceof NamedReference);
+    Assertions.assertEquals(
+        "col_3", ((NamedReference) 
loadedLocalTable.sortOrder()[0].expression()).fieldName()[0]);
 
     Table distributedTable =
         tableCatalog.createTable(
@@ -239,7 +251,8 @@ public class CatalogClickHouseClusterIT extends BaseIT {
     Assertions.assertEquals(
         ENGINE.DISTRIBUTED.getValue(),
         loadedDistributedTable.properties().get(GRAVITINO_ENGINE_KEY));
-    Assertions.assertEquals("false", 
loadedDistributedTable.properties().get(ON_CLUSTER));
+    // Cluster metadata embedded in COMMENT at create time; ON_CLUSTER is now 
recoverable.
+    Assertions.assertEquals("true", 
loadedDistributedTable.properties().get(ON_CLUSTER));
     Assertions.assertEquals(
         ClickHouseContainer.DEFAULT_CLUSTER_NAME,
         loadedDistributedTable.properties().get(CLUSTER_NAME));
@@ -263,6 +276,12 @@ public class CatalogClickHouseClusterIT extends BaseIT {
         ENGINE.MERGETREE.getValue(), 
loadedNonClusterTable.properties().get(GRAVITINO_ENGINE_KEY));
     Assertions.assertEquals("false", 
loadedNonClusterTable.properties().get(ON_CLUSTER));
     
Assertions.assertFalse(loadedNonClusterTable.properties().containsKey(CLUSTER_NAME));
+    Assertions.assertEquals(1, loadedNonClusterTable.sortOrder().length);
+    Assertions.assertTrue(
+        loadedNonClusterTable.sortOrder()[0].expression() instanceof 
NamedReference);
+    Assertions.assertEquals(
+        "col_3",
+        ((NamedReference) 
loadedNonClusterTable.sortOrder()[0].expression()).fieldName()[0]);
 
     try (Connection connection =
             DriverManager.getConnection(
@@ -356,4 +375,374 @@ public class CatalogClickHouseClusterIT extends BaseIT {
     Assertions.assertEquals(
         sqlNonClusterLocalTableName, 
sqlNonClusterDistributedTable.properties().get(REMOTE_TABLE));
   }
+
+  @Test
+  public void testAlterTableBranchCoverageInCluster() {
+    String alterTableName = GravitinoITUtils.genRandomName("ck_cluster_alter");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, 
alterTableName);
+    Column[] columns = createColumns();
+    Index[] indexes =
+        new Index[] {
+          Indexes.of(Index.IndexType.DATA_SKIPPING_MINMAX, "idx_col_2", new 
String[][] {{"col_2"}})
+        };
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        tableComment,
+        clusterMergeTreeProperties(),
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        getSortOrders("col_3"),
+        indexes);
+
+    tableCatalog.alterTable(
+        tableIdentifier,
+        TableChange.addColumn(
+            new String[] {"col_4"},
+            Types.StringType.get(),
+            "new col",
+            TableChange.ColumnPosition.first()),
+        TableChange.updateColumnPosition(
+            new String[] {"col_2"}, 
TableChange.ColumnPosition.after("col_3")));
+    tableCatalog.alterTable(
+        tableIdentifier,
+        TableChange.updateColumnType(new String[] {"col_1"}, 
Types.LongType.get()));
+    tableCatalog.alterTable(
+        tableIdentifier,
+        TableChange.updateColumnComment(new String[] {"col_1"}, 
"col_1_new_comment"));
+    tableCatalog.alterTable(
+        tableIdentifier,
+        TableChange.updateColumnDefaultValue(
+            new String[] {"col_1"}, Literals.of("2", Types.LongType.get())));
+    tableCatalog.alterTable(
+        tableIdentifier, TableChange.updateColumnNullability(new String[] 
{"col_1"}, true));
+    tableCatalog.alterTable(tableIdentifier, 
TableChange.deleteIndex("idx_col_2", false));
+
+    Table alteredTable = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(4, alteredTable.columns().length);
+    Assertions.assertEquals("col_4", alteredTable.columns()[0].name());
+    int col2Position = -1;
+    int col3Position = -1;
+    for (int i = 0; i < alteredTable.columns().length; i++) {
+      if (Objects.equals("col_2", alteredTable.columns()[i].name())) {
+        col2Position = i;
+      }
+      if (Objects.equals("col_3", alteredTable.columns()[i].name())) {
+        col3Position = i;
+      }
+    }
+    Assertions.assertTrue(col2Position > col3Position, "col_2 should appear 
after col_3");
+    Column alteredCol1 =
+        Arrays.stream(alteredTable.columns())
+            .filter(column -> Objects.equals("col_1", column.name()))
+            .findFirst()
+            .orElseThrow(() -> new AssertionError("col_1 should exist"));
+    Assertions.assertTrue(alteredCol1.nullable());
+    Assertions.assertEquals(Types.LongType.get(), alteredCol1.dataType());
+    Assertions.assertEquals("col_1_new_comment", alteredCol1.comment());
+    Assertions.assertEquals(Literals.of("2", Types.LongType.get()), 
alteredCol1.defaultValue());
+    Assertions.assertFalse(
+        Arrays.stream(alteredTable.index())
+            .anyMatch(index -> Objects.equals(index.name(), "idx_col_2")));
+
+    Assertions.assertDoesNotThrow(
+        () ->
+            tableCatalog.alterTable(
+                tableIdentifier,
+                TableChange.deleteIndex("missing_idx", true),
+                TableChange.deleteColumn(new String[] {"missing_col"}, true)));
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            tableCatalog.alterTable(
+                tableIdentifier, TableChange.deleteIndex("missing_idx", 
false)));
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            tableCatalog.alterTable(
+                tableIdentifier, TableChange.deleteColumn(new String[] 
{"missing_col"}, false)));
+    Assertions.assertThrows(
+        UnsupportedOperationException.class,
+        () -> tableCatalog.alterTable(tableIdentifier, 
TableChange.setProperty("k", "v")));
+    Assertions.assertThrows(
+        UnsupportedOperationException.class,
+        () -> tableCatalog.alterTable(tableIdentifier, 
TableChange.removeProperty("k")));
+    Assertions.assertThrows(
+        UnsupportedOperationException.class,
+        () ->
+            tableCatalog.alterTable(
+                tableIdentifier,
+                TableChange.updateColumnAutoIncrement(new String[] {"col_1"}, 
true)));
+  }
+
+  @Test
+  public void testAlterAddIndexAndAutoIncrementInCluster() {
+    String tableName = GravitinoITUtils.genRandomName("ck_cluster_alter_idx");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+    Column[] columns = createColumns();
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        tableComment,
+        clusterMergeTreeProperties(),
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        getSortOrders("col_3"),
+        Indexes.EMPTY_INDEXES);
+
+    tableCatalog.alterTable(
+        tableIdentifier,
+        TableChange.addIndex(
+            Index.IndexType.DATA_SKIPPING_MINMAX, "idx_col_1_new", new 
String[][] {{"col_1"}}));
+    Table loaded = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertTrue(
+        Arrays.stream(loaded.index())
+            .anyMatch(index -> Objects.equals(index.name(), "idx_col_1_new")));
+    tableCatalog.alterTable(tableIdentifier, 
TableChange.deleteIndex("idx_col_1_new", false));
+    loaded = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertFalse(
+        Arrays.stream(loaded.index())
+            .anyMatch(index -> Objects.equals(index.name(), "idx_col_1_new")));
+
+    RuntimeException autoIncrementTrueException =
+        Assertions.assertThrows(
+            RuntimeException.class,
+            () ->
+                tableCatalog.alterTable(
+                    tableIdentifier,
+                    TableChange.updateColumnAutoIncrement(new String[] 
{"col_1"}, true)));
+    Assertions.assertTrue(
+        autoIncrementTrueException.getMessage().contains("auto increment is 
not supported"));
+
+    RuntimeException autoIncrementFalseException =
+        Assertions.assertThrows(
+            RuntimeException.class,
+            () ->
+                tableCatalog.alterTable(
+                    tableIdentifier,
+                    TableChange.updateColumnAutoIncrement(new String[] 
{"col_1"}, false)));
+    Assertions.assertTrue(
+        autoIncrementFalseException.getMessage().contains("auto increment is 
not supported"));
+  }
+
+  @Test
+  public void testDropTableOnCluster() {
+    String dropTableName = 
GravitinoITUtils.genRandomName("ck_cluster_drop_tbl");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, 
dropTableName);
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+
+    // Create a MergeTree table on the cluster
+    tableCatalog.createTable(
+        tableIdentifier,
+        createColumns(),
+        tableComment,
+        clusterMergeTreeProperties(),
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        getSortOrders("col_3"),
+        Indexes.EMPTY_INDEXES);
+
+    Assertions.assertNotNull(tableCatalog.loadTable(tableIdentifier));
+
+    // Drop the table — Gravitino should issue DROP TABLE ... ON CLUSTER ... 
SYNC
+    boolean dropped = tableCatalog.dropTable(tableIdentifier);
+    Assertions.assertTrue(dropped);
+
+    // Verify the table no longer exists via Gravitino
+    Assertions.assertFalse(
+        Arrays.stream(tableCatalog.listTables(Namespace.of(schemaName)))
+            .anyMatch(id -> id.name().equals(dropTableName)));
+  }
+
+  @Test
+  public void testDropNonClusterTableDoesNotUseOnCluster() {
+    String dropTableName = 
GravitinoITUtils.genRandomName("ck_no_cluster_drop_tbl");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, 
dropTableName);
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+
+    // Create a MergeTree table without ON CLUSTER
+    tableCatalog.createTable(
+        tableIdentifier,
+        createColumns(),
+        tableComment,
+        Collections.singletonMap(GRAVITINO_ENGINE_KEY, 
ENGINE.MERGETREE.getValue()),
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        getSortOrders("col_3"),
+        Indexes.EMPTY_INDEXES);
+
+    Assertions.assertNotNull(tableCatalog.loadTable(tableIdentifier));
+
+    boolean dropped = tableCatalog.dropTable(tableIdentifier);
+    Assertions.assertTrue(dropped);
+
+    Assertions.assertFalse(
+        Arrays.stream(tableCatalog.listTables(Namespace.of(schemaName)))
+            .anyMatch(id -> id.name().equals(dropTableName)));
+  }
+
+  @Test
+  public void testDropSchemaOnCluster() {
+    String dropSchemaName = 
GravitinoITUtils.genRandomName("ck_cluster_drop_schema");
+
+    // Create a schema on the cluster
+    Map<String, String> schemaProps = new HashMap<>();
+    schemaProps.put(CLUSTER_NAME, ClickHouseContainer.DEFAULT_CLUSTER_NAME);
+    schemaProps.put(ON_CLUSTER, String.valueOf(true));
+    catalog.asSchemas().createSchema(dropSchemaName, null, schemaProps);
+    Assertions.assertNotNull(catalog.asSchemas().loadSchema(dropSchemaName));
+
+    // Drop the schema — Gravitino should issue DROP DATABASE ... ON CLUSTER 
... SYNC
+    boolean dropped = catalog.asSchemas().dropSchema(dropSchemaName, false);
+    Assertions.assertTrue(dropped);
+
+    Assertions.assertFalse(
+        Arrays.stream(catalog.asSchemas().listSchemas()).anyMatch(s -> 
s.equals(dropSchemaName)));
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Cluster metadata round-trip via COMMENT embedding
+  // 
---------------------------------------------------------------------------
+
+  /**
+   * Gravitino embeds the cluster name in the database COMMENT field at CREATE 
time, because SHOW
+   * CREATE DATABASE does not include ON CLUSTER. This test verifies the 
metadata survives the
+   * round-trip: createSchema → loadSchema → cluster properties present.
+   */
+  @Test
+  public void 
testLoadGravitinoCreatedSchemaOnClusterReturnsClusterProperties() {
+    String name = GravitinoITUtils.genRandomName("ck_cluster_schema_props");
+    Map<String, String> props = new HashMap<>();
+    props.put(CLUSTER_NAME, ClickHouseContainer.DEFAULT_CLUSTER_NAME);
+    props.put(ON_CLUSTER, String.valueOf(true));
+
+    catalog.asSchemas().createSchema(name, "schema comment", props);
+    try {
+      Schema loaded = catalog.asSchemas().loadSchema(name);
+      Assertions.assertEquals(
+          String.valueOf(true),
+          loaded.properties().get(ON_CLUSTER),
+          "loadSchema must return on-cluster=true for a Gravitino-created 
cluster schema");
+      Assertions.assertEquals(
+          ClickHouseContainer.DEFAULT_CLUSTER_NAME,
+          loaded.properties().get(CLUSTER_NAME),
+          "loadSchema must return the cluster name embedded in COMMENT at 
create time");
+    } finally {
+      catalog.asSchemas().dropSchema(name, true);
+    }
+  }
+
+  /**
+   * Verifies that a table created ON CLUSTER via Gravitino reports the 
correct cluster properties
+   * when loaded. SHOW CREATE TABLE does not include ON CLUSTER; Gravitino 
recovers the cluster name
+   * from the embedded COMMENT metadata.
+   */
+  @Test
+  public void testLoadGravitinoCreatedTableOnClusterReturnsClusterProperties() 
{
+    String name = GravitinoITUtils.genRandomName("ck_cluster_tbl_props");
+    NameIdentifier tableIdent = NameIdentifier.of(schemaName, name);
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+
+    tableCatalog.createTable(
+        tableIdent,
+        createColumns(),
+        "table comment",
+        clusterMergeTreeProperties(),
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        getSortOrders("col_3"),
+        Indexes.EMPTY_INDEXES);
+
+    try {
+      Table loaded = tableCatalog.loadTable(tableIdent);
+      Assertions.assertEquals(
+          String.valueOf(true),
+          loaded.properties().get(ON_CLUSTER),
+          "loadTable must return on-cluster=true for a Gravitino-created 
cluster table");
+      Assertions.assertEquals(
+          ClickHouseContainer.DEFAULT_CLUSTER_NAME,
+          loaded.properties().get(CLUSTER_NAME),
+          "loadTable must return the cluster name embedded in COMMENT at 
create time");
+    } finally {
+      tableCatalog.dropTable(tableIdent);
+    }
+  }
+
+  /**
+   * When a user updates the table comment via Gravitino, the cluster metadata 
embedded in the
+   * ClickHouse COMMENT field must be preserved. Without re-embedding, the 
next loadTable call would
+   * lose the cluster name.
+   */
+  @Test
+  public void testUpdateTableCommentOnClusterPreservesClusterProperties() {
+    String name = GravitinoITUtils.genRandomName("ck_cluster_tbl_upd_cmt");
+    NameIdentifier tableIdent = NameIdentifier.of(schemaName, name);
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+
+    tableCatalog.createTable(
+        tableIdent,
+        createColumns(),
+        "original comment",
+        clusterMergeTreeProperties(),
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        getSortOrders("col_3"),
+        Indexes.EMPTY_INDEXES);
+
+    try {
+      // Update the table comment
+      tableCatalog.alterTable(tableIdent, TableChange.updateComment("updated 
comment"));
+
+      // Cluster properties must still be present after the comment update
+      Table loaded = tableCatalog.loadTable(tableIdent);
+      Assertions.assertEquals(
+          String.valueOf(true),
+          loaded.properties().get(ON_CLUSTER),
+          "on-cluster must be preserved after a comment update");
+      Assertions.assertEquals(
+          ClickHouseContainer.DEFAULT_CLUSTER_NAME,
+          loaded.properties().get(CLUSTER_NAME),
+          "cluster-name must be preserved after a comment update");
+    } finally {
+      tableCatalog.dropTable(tableIdent);
+    }
+  }
+
+  /**
+   * Tables created directly in ClickHouse (not through Gravitino) have no 
cluster metadata embedded
+   * in their COMMENT field. Gravitino must report on-cluster=false and omit 
cluster-name for such
+   * tables, and document this as a known limitation.
+   */
+  @Test
+  public void testNonGravitinoCreatedClusterTableHasNoClusterProperties() {
+    String name = GravitinoITUtils.genRandomName("ck_sql_direct_cluster");
+
+    // Create directly in ClickHouse — no Gravitino cluster metadata in COMMENT
+    clickHouseService.executeQuery(
+        String.format(
+            "CREATE TABLE `%s`.`%s` ON CLUSTER `%s` "
+                + "(col_1 Int32, col_2 Date, col_3 String) "
+                + "ENGINE = MergeTree ORDER BY col_1",
+            schemaName, name, ClickHouseContainer.DEFAULT_CLUSTER_NAME));
+
+    try {
+      Table loaded = 
catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName, name));
+      // Without Gravitino COMMENT metadata there is no way to recover cluster 
info —
+      // this is the documented limitation.
+      Assertions.assertEquals(
+          String.valueOf(false),
+          loaded.properties().get(ON_CLUSTER),
+          "on-cluster must be false for non-Gravitino-created tables (no 
embedded COMMENT metadata)");
+      Assertions.assertFalse(
+          loaded.properties().containsKey(CLUSTER_NAME),
+          "cluster-name must be absent for non-Gravitino-created tables");
+    } finally {
+      clickHouseService.executeQuery(
+          String.format(
+              "DROP TABLE `%s`.`%s` ON CLUSTER `%s` SYNC",
+              schemaName, name, ClickHouseContainer.DEFAULT_CLUSTER_NAME));
+    }
+  }
 }
diff --git 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/integration/test/CatalogClickHouseIT.java
 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/integration/test/CatalogClickHouseIT.java
index aeb2ed6c07..794189d0ff 100644
--- 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/integration/test/CatalogClickHouseIT.java
+++ 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/integration/test/CatalogClickHouseIT.java
@@ -1183,6 +1183,186 @@ public class CatalogClickHouseIT extends BaseIT {
         Literals.of("2.34", Types.DecimalType.of(3, 2)), 
table.columns()[4].defaultValue());
   }
 
+  @Test
+  void testAlterTableBranchCoverage() {
+    String branchTableName = GravitinoITUtils.genRandomName("alter_branch");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, 
branchTableName);
+    Column[] columns =
+        new Column[] {
+          Column.of(
+              "id", Types.IntegerType.get(), "id column", false, false, 
Literals.integerLiteral(1)),
+          Column.of(
+              "score", Types.IntegerType.get(), "score", false, false, 
Literals.integerLiteral(10)),
+          Column.of("note", Types.StringType.get(), "note")
+        };
+    Index[] indexes =
+        new Index[] {
+          Indexes.of(Index.IndexType.DATA_SKIPPING_MINMAX, "idx_note", new 
String[][] {{"note"}})
+        };
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        createProperties(),
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        getSortOrders("id"),
+        indexes);
+
+    tableCatalog.alterTable(
+        tableIdentifier, TableChange.updateColumnNullability(new String[] 
{"score"}, true));
+    tableCatalog.alterTable(
+        tableIdentifier,
+        TableChange.updateColumnComment(new String[] {"score"}, "score column 
changed"));
+    tableCatalog.alterTable(tableIdentifier, 
TableChange.deleteIndex("idx_note", false));
+    Table loaded = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertTrue(loaded.columns()[1].nullable());
+    Assertions.assertEquals("score column changed", 
loaded.columns()[1].comment());
+    Assertions.assertFalse(
+        Arrays.stream(loaded.index()).anyMatch(index -> 
Objects.equals(index.name(), "idx_note")));
+
+    Assertions.assertDoesNotThrow(
+        () ->
+            tableCatalog.alterTable(
+                tableIdentifier,
+                TableChange.deleteColumn(new String[] {"missing_col"}, true),
+                TableChange.deleteIndex("missing_idx", true)));
+
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            tableCatalog.alterTable(
+                tableIdentifier, TableChange.deleteIndex("missing_idx", 
false)));
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            tableCatalog.alterTable(
+                tableIdentifier, TableChange.deleteColumn(new String[] 
{"missing_col"}, false)));
+    Assertions.assertThrows(
+        UnsupportedOperationException.class,
+        () -> tableCatalog.alterTable(tableIdentifier, 
TableChange.setProperty("k", "v")));
+    Assertions.assertThrows(
+        UnsupportedOperationException.class,
+        () -> tableCatalog.alterTable(tableIdentifier, 
TableChange.removeProperty("k")));
+    Assertions.assertThrows(
+        UnsupportedOperationException.class,
+        () ->
+            tableCatalog.alterTable(
+                tableIdentifier, TableChange.updateColumnAutoIncrement(new 
String[] {"id"}, true)));
+    Assertions.assertThrows(
+        UnsupportedOperationException.class,
+        () ->
+            tableCatalog.alterTable(
+                tableIdentifier,
+                TableChange.addColumn(
+                    new String[] {"parent", "nested"},
+                    Types.IntegerType.get(),
+                    "nested",
+                    TableChange.ColumnPosition.defaultPos())));
+  }
+
+  @Test
+  void testAlterIndexAndAutoIncrementBranches() {
+    String tableWithIndexes = 
GravitinoITUtils.genRandomName("alter_idx_branch");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, 
tableWithIndexes);
+    Column[] columns =
+        new Column[] {
+          Column.of("id", Types.IntegerType.get(), "id", false, false, 
DEFAULT_VALUE_NOT_SET),
+          Column.of("score", Types.IntegerType.get(), "score", false, false, 
DEFAULT_VALUE_NOT_SET),
+          Column.of("note", Types.StringType.get(), "note")
+        };
+    Index[] indexes =
+        new Index[] {
+          Indexes.of(
+              Index.IndexType.DATA_SKIPPING_MINMAX, "idx_score_minmax", new 
String[][] {{"score"}}),
+          Indexes.of(
+              Index.IndexType.DATA_SKIPPING_BLOOM_FILTER,
+              "idx_note_bloom",
+              new String[][] {{"note"}})
+        };
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        createProperties(),
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        getSortOrders("id"),
+        indexes);
+
+    tableCatalog.alterTable(
+        tableIdentifier,
+        TableChange.addIndex(
+            Index.IndexType.DATA_SKIPPING_MINMAX, "idx_new", new String[][] 
{{"score"}}));
+    Table loaded = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertTrue(
+        Arrays.stream(loaded.index()).anyMatch(index -> 
Objects.equals(index.name(), "idx_new")));
+
+    tableCatalog.alterTable(
+        tableIdentifier,
+        TableChange.deleteIndex("idx_score_minmax", false),
+        TableChange.deleteIndex("idx_note_bloom", false),
+        TableChange.deleteIndex("idx_new", false));
+    loaded = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertFalse(
+        Arrays.stream(loaded.index())
+            .anyMatch(index -> Objects.equals(index.name(), 
"idx_score_minmax")));
+    Assertions.assertFalse(
+        Arrays.stream(loaded.index())
+            .anyMatch(index -> Objects.equals(index.name(), 
"idx_note_bloom")));
+    Assertions.assertFalse(
+        Arrays.stream(loaded.index()).anyMatch(index -> 
Objects.equals(index.name(), "idx_new")));
+
+    RuntimeException autoIncrementTrueException =
+        Assertions.assertThrows(
+            RuntimeException.class,
+            () ->
+                tableCatalog.alterTable(
+                    tableIdentifier,
+                    TableChange.updateColumnAutoIncrement(new String[] {"id"}, 
true)));
+    Assertions.assertTrue(
+        autoIncrementTrueException.getMessage().contains("auto increment is 
not supported"));
+
+    RuntimeException autoIncrementFalseException =
+        Assertions.assertThrows(
+            RuntimeException.class,
+            () ->
+                tableCatalog.alterTable(
+                    tableIdentifier,
+                    TableChange.updateColumnAutoIncrement(new String[] {"id"}, 
false)));
+    Assertions.assertTrue(
+        autoIncrementFalseException.getMessage().contains("auto increment is 
not supported"));
+  }
+
+  @Test
+  void testCreateTableWithAutoIncrementUnsupported() {
+    String tableName = GravitinoITUtils.genRandomName("create_auto_inc");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+    Column[] columns =
+        new Column[] {
+          Column.of("id", Types.IntegerType.get(), "id", false, true, 
DEFAULT_VALUE_NOT_SET),
+          Column.of("name", Types.StringType.get(), "name", true, false, 
DEFAULT_VALUE_NOT_SET)
+        };
+
+    RuntimeException exception =
+        Assertions.assertThrows(
+            RuntimeException.class,
+            () ->
+                catalog
+                    .asTableCatalog()
+                    .createTable(
+                        tableIdentifier,
+                        columns,
+                        table_comment,
+                        createProperties(),
+                        Transforms.EMPTY_TRANSFORM,
+                        Distributions.NONE,
+                        getSortOrders("id")));
+    Assertions.assertTrue(exception.getMessage().contains("auto increment"));
+  }
+
   @Test
   void testDropClickHouseDatabase() {
     String schemaName = 
GravitinoITUtils.genRandomName("clickhouse_schema").toLowerCase();
@@ -1206,7 +1386,8 @@ public class CatalogClickHouseIT extends BaseIT {
     Throwable excep =
         Assertions.assertThrows(
             RuntimeException.class, () -> 
catalog.asSchemas().dropSchema(schemaName, false));
-    Assertions.assertTrue(excep.getMessage().contains("the value of cascade 
should be true."));
+    Assertions.assertTrue(
+        excep.getMessage().contains("Database %s is not 
empty".formatted(schemaName)));
 
     // Check the database still exists
     catalog.asSchemas().loadSchema(schemaName);
@@ -1222,6 +1403,79 @@ public class CatalogClickHouseIT extends BaseIT {
         });
   }
 
+  @Test
+  void testCreateSameTableNameAcrossSchemas() {
+    String firstSchema = 
GravitinoITUtils.genRandomName("ck_db1").toLowerCase();
+    String secondSchema = 
GravitinoITUtils.genRandomName("ck_db2").toLowerCase();
+    String sharedTableName = 
GravitinoITUtils.genRandomName("ck_t").toLowerCase();
+    String firstTableComment = "first schema table";
+    String secondTableComment = "second schema table";
+
+    SupportsSchemas schemaSupport = catalog.asSchemas();
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    schemaSupport.createSchema(firstSchema, null, Collections.emptyMap());
+    schemaSupport.createSchema(secondSchema, null, Collections.emptyMap());
+
+    try {
+      Column[] firstSchemaColumns =
+          new Column[] {
+            Column.of("id", Types.IntegerType.get(), "id", false, false, 
DEFAULT_VALUE_NOT_SET),
+            Column.of(
+                "first_value", Types.StringType.get(), "value", true, false, 
DEFAULT_VALUE_NOT_SET)
+          };
+      Column[] secondSchemaColumns =
+          new Column[] {
+            Column.of("id", Types.IntegerType.get(), "id", false, false, 
DEFAULT_VALUE_NOT_SET),
+            Column.of(
+                "second_value", Types.LongType.get(), "value", true, false, 
DEFAULT_VALUE_NOT_SET)
+          };
+
+      NameIdentifier firstTableIdentifier = NameIdentifier.of(firstSchema, 
sharedTableName);
+      NameIdentifier secondTableIdentifier = NameIdentifier.of(secondSchema, 
sharedTableName);
+
+      tableCatalog.createTable(
+          firstTableIdentifier,
+          firstSchemaColumns,
+          firstTableComment,
+          createProperties(),
+          Transforms.EMPTY_TRANSFORM,
+          Distributions.NONE,
+          getSortOrders("id"));
+      Table firstTable = tableCatalog.loadTable(firstTableIdentifier);
+
+      tableCatalog.createTable(
+          secondTableIdentifier,
+          secondSchemaColumns,
+          secondTableComment,
+          createProperties(),
+          Transforms.EMPTY_TRANSFORM,
+          Distributions.NONE,
+          getSortOrders("id"));
+
+      Table secondTable = tableCatalog.loadTable(secondTableIdentifier);
+      Assertions.assertEquals(sharedTableName, firstTable.name());
+      Assertions.assertEquals(sharedTableName, secondTable.name());
+      Assertions.assertEquals(firstTableComment, firstTable.comment());
+      Assertions.assertEquals(secondTableComment, secondTable.comment());
+      Assertions.assertEquals("first_value", firstTable.columns()[1].name());
+      Assertions.assertEquals("second_value", secondTable.columns()[1].name());
+
+      Set<String> firstSchemaTables =
+          Arrays.stream(tableCatalog.listTables(Namespace.of(firstSchema)))
+              .map(NameIdentifier::name)
+              .collect(Collectors.toSet());
+      Set<String> secondSchemaTables =
+          Arrays.stream(tableCatalog.listTables(Namespace.of(secondSchema)))
+              .map(NameIdentifier::name)
+              .collect(Collectors.toSet());
+      Assertions.assertEquals(Collections.singleton(sharedTableName), 
firstSchemaTables);
+      Assertions.assertEquals(Collections.singleton(sharedTableName), 
secondSchemaTables);
+    } finally {
+      schemaSupport.dropSchema(firstSchema, true);
+      schemaSupport.dropSchema(secondSchema, true);
+    }
+  }
+
   @Test
   public void testSchemaComment() {
     final String testSchemaName = GravitinoITUtils.genRandomName("test");
diff --git 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseDatabaseOperations.java
 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseDatabaseOperations.java
index 8b2fb1f23c..e728b43736 100644
--- 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseDatabaseOperations.java
+++ 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseDatabaseOperations.java
@@ -18,13 +18,17 @@
  */
 package org.apache.gravitino.catalog.clickhouse.operations;
 
+import static 
org.apache.gravitino.catalog.clickhouse.operations.ClickHouseClusterUtils.CLUSTER_META_PREFIX;
+import static 
org.apache.gravitino.catalog.clickhouse.operations.ClickHouseClusterUtils.embedClusterInComment;
+import static 
org.apache.gravitino.catalog.clickhouse.operations.ClickHouseClusterUtils.extractClusterFromComment;
+import static 
org.apache.gravitino.catalog.clickhouse.operations.ClickHouseClusterUtils.stripClusterMetadata;
+
 import java.sql.Connection;
 import java.sql.Statement;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import javax.sql.DataSource;
-import org.apache.gravitino.catalog.clickhouse.ClickHouseConfig;
 import 
org.apache.gravitino.catalog.clickhouse.ClickHouseConstants.ClusterConstants;
 import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
 import org.junit.jupiter.api.Assertions;
@@ -38,43 +42,146 @@ public class TestClickHouseDatabaseOperations {
       return generateCreateDatabaseSql(databaseName, comment, properties);
     }
 
-    String buildDropSql(String databaseName, boolean cascade) {
-      return generateDropDatabaseSql(databaseName, cascade);
+    String buildDropSql(String databaseName, String clusterName) {
+      return generateDropDatabaseSql(databaseName, clusterName);
     }
   }
 
-  private TestableClickHouseDatabaseOperations newOps(Map<String, String> 
conf) {
+  private TestableClickHouseDatabaseOperations newOps() {
     TestableClickHouseDatabaseOperations ops = new 
TestableClickHouseDatabaseOperations();
-    ops.initialize(null, new JdbcExceptionConverter(), conf);
+    ops.initialize(null, new JdbcExceptionConverter(), new HashMap<>());
     return ops;
   }
 
+  // 
---------------------------------------------------------------------------
+  // CREATE DATABASE SQL generation
+  // 
---------------------------------------------------------------------------
+
   @Test
   void testGenerateCreateDatabaseSqlWithoutCluster() {
-    Map<String, String> conf = new HashMap<>();
-    String sql = newOps(conf).buildCreateSql("db_name", null, 
Collections.emptyMap());
+    String sql = newOps().buildCreateSql("db_name", null, 
Collections.emptyMap());
     Assertions.assertEquals("CREATE DATABASE `db_name`", sql);
   }
 
   @Test
-  void testGenerateCreateDatabaseSqlWithClusterNameButDisabled() {
-    Map<String, String> conf = new HashMap<>();
-    conf.put(ClickHouseConfig.CK_CLUSTER_NAME.getKey(), "ck_cluster");
+  void testGenerateCreateDatabaseSqlWithComment() {
+    String sql = newOps().buildCreateSql("db_name", "my comment", 
Collections.emptyMap());
+    Assertions.assertEquals("CREATE DATABASE `db_name` COMMENT 'my comment'", 
sql);
+  }
 
-    String sql = newOps(conf).buildCreateSql("db_name", "comment", 
Collections.emptyMap());
+  @Test
+  void testGenerateCreateDatabaseSqlWithClusterNameButDisabled() {
+    String sql = newOps().buildCreateSql("db_name", "comment", 
Collections.emptyMap());
     Assertions.assertEquals("CREATE DATABASE `db_name` COMMENT 'comment'", 
sql);
   }
 
   @Test
-  void testGenerateCreateDatabaseSqlWithClusterEnabled() {
+  void testGenerateCreateDatabaseSqlWithClusterNoComment() {
     Map<String, String> properties = new HashMap<>();
     properties.put(ClusterConstants.CLUSTER_NAME, "ck_cluster");
     properties.put(ClusterConstants.ON_CLUSTER, "true");
 
-    String sql = newOps(new HashMap<>()).buildCreateSql("db_name", null, 
properties);
-    Assertions.assertEquals("CREATE DATABASE `db_name` ON CLUSTER 
`ck_cluster`", sql);
+    String sql = newOps().buildCreateSql("db_name", null, properties);
+    // Cluster metadata embedded in COMMENT; user comment is empty
+    String expectedComment = CLUSTER_META_PREFIX + "ck_cluster";
+    Assertions.assertEquals(
+        "CREATE DATABASE `db_name` ON CLUSTER `ck_cluster` COMMENT '" + 
expectedComment + "'", sql);
+  }
+
+  @Test
+  void testGenerateCreateDatabaseSqlWithClusterAndComment() {
+    Map<String, String> properties = new HashMap<>();
+    properties.put(ClusterConstants.CLUSTER_NAME, "ck_cluster");
+    properties.put(ClusterConstants.ON_CLUSTER, "true");
+
+    String sql = newOps().buildCreateSql("db_name", "my comment", properties);
+    // User comment preserved before the separator
+    String expectedComment = "my comment" + CLUSTER_META_PREFIX + "ck_cluster";
+    Assertions.assertEquals(
+        "CREATE DATABASE `db_name` ON CLUSTER `ck_cluster` COMMENT '" + 
expectedComment + "'", sql);
+  }
+
+  // 
---------------------------------------------------------------------------
+  // DROP DATABASE SQL generation
+  // 
---------------------------------------------------------------------------
+
+  @Test
+  void testGenerateDropDatabaseSqlWithoutCluster() {
+    String sql = newOps().buildDropSql("db_name", null);
+    Assertions.assertEquals("DROP DATABASE `db_name`", sql);
+  }
+
+  @Test
+  void testGenerateDropDatabaseSqlWithBlankCluster() {
+    String sql = newOps().buildDropSql("db_name", "");
+    Assertions.assertEquals("DROP DATABASE `db_name`", sql);
+  }
+
+  @Test
+  void testGenerateDropDatabaseSqlWithCluster() {
+    String sql = newOps().buildDropSql("db_name", "ck_cluster");
+    Assertions.assertEquals("DROP DATABASE `db_name` ON CLUSTER `ck_cluster`", 
sql);
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Comment metadata helpers
+  // 
---------------------------------------------------------------------------
+
+  @Test
+  void testEmbedClusterInCommentNullUserComment() {
+    String stored = embedClusterInComment(null, "mycluster");
+    Assertions.assertEquals(CLUSTER_META_PREFIX + "mycluster", stored);
+  }
+
+  @Test
+  void testEmbedClusterInCommentWithUserComment() {
+    String stored = embedClusterInComment("hello", "mycluster");
+    Assertions.assertEquals("hello" + CLUSTER_META_PREFIX + "mycluster", 
stored);
+  }
+
+  @Test
+  void testExtractClusterFromCommentPresent() {
+    String stored = embedClusterInComment("some comment", "ck_cluster");
+    Assertions.assertEquals("ck_cluster", extractClusterFromComment(stored));
+  }
+
+  @Test
+  void testExtractClusterFromCommentAbsent() {
+    Assertions.assertNull(extractClusterFromComment("plain comment"));
+  }
+
+  @Test
+  void testExtractClusterFromCommentNull() {
+    Assertions.assertNull(extractClusterFromComment(null));
+  }
+
+  @Test
+  void testStripClusterMetadataPresent() {
+    String stored = embedClusterInComment("user comment", "mycluster");
+    Assertions.assertEquals("user comment", stripClusterMetadata(stored));
+  }
+
+  @Test
+  void testStripClusterMetadataAbsent() {
+    Assertions.assertEquals("plain comment", stripClusterMetadata("plain 
comment"));
+  }
+
+  @Test
+  void testStripClusterMetadataNull() {
+    Assertions.assertNull(stripClusterMetadata(null));
   }
 
+  @Test
+  void testStripClusterMetadataNoUserComment() {
+    // When user has no comment but cluster info is embedded
+    String stored = embedClusterInComment(null, "mycluster");
+    Assertions.assertEquals("", stripClusterMetadata(stored));
+  }
+
+  // 
---------------------------------------------------------------------------
+  // CREATE uses system catalog before execution
+  // 
---------------------------------------------------------------------------
+
   @Test
   void testCreateUsesSystemCatalogBeforeExecution() throws Exception {
     DataSource dataSource = Mockito.mock(DataSource.class);
@@ -93,11 +200,4 @@ public class TestClickHouseDatabaseOperations {
     Mockito.verify(connection).setCatalog("information_schema");
     Mockito.verify(statement).executeUpdate("CREATE DATABASE `new_db`");
   }
-
-  @Test
-  void testGenerateDropDatabaseSqlWithoutCluster() {
-    Map<String, String> conf = new HashMap<>();
-    String sql = newOps(conf).buildDropSql("db_name", true);
-    Assertions.assertEquals("DROP DATABASE `db_name`", sql);
-  }
 }
diff --git 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseEngineIT.java
 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseEngineIT.java
new file mode 100644
index 0000000000..18b6ea9335
--- /dev/null
+++ 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseEngineIT.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.clickhouse.operations;
+
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.ENGINE;
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.GRAVITINO_ENGINE_KEY;
+
+import com.google.common.collect.Maps;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import javax.sql.DataSource;
+import org.apache.commons.lang3.RandomStringUtils;
+import 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseColumnDefaultValueConverter;
+import 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseExceptionConverter;
+import 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter;
+import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
+import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
+import org.apache.gravitino.catalog.jdbc.utils.DataSourceUtils;
+import org.apache.gravitino.integration.test.container.ClickHouseContainer;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.util.TestDatabaseName;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.types.Types;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.testcontainers.DockerClientFactory;
+
+/**
+ * Integration tests that verify Gravitino can create and load ClickHouse 
tables using various
+ * engine types, and that table metadata (not data) survives a ClickHouse 
server restart.
+ *
+ * <p>Key findings documented here:
+ *
+ * <ul>
+ *   <li><b>TinyLog / StripeLog / Log</b>: Fully supported. Table definition 
and data persist after
+ *       restart.
+ *   <li><b>Memory</b>: Table definition persists after restart (loadTable 
succeeds), but all data
+ *       is lost. Gravitino metadata and ClickHouse are not in conflict, but 
users must be aware of
+ *       the data volatility.
+ *   <li><b>Null</b>: Supported. Table definition persists. Data is always 
discarded by design.
+ *   <li><b>Set</b>: Supported. Table definition and data persist after 
restart (ClickHouse 24.x+
+ *       stores Set engine data on disk).
+ *   <li><b>Join</b>: ❌ NOT directly creatable via Gravitino. ClickHouse 
requires {@code ENGINE =
+ *       Join(strictness, join_type, keys...)} with at least 3 parameters; 
Gravitino passes only the
+ *       bare engine name and ClickHouse rejects it.
+ *   <li><b>Buffer</b>: ❌ Requires parameterized ENGINE clause (destination 
database/table and flush
+ *       thresholds); Gravitino's CREATE TABLE does not support arbitrary 
ENGINE parameters, so
+ *       Buffer is not directly creatable via the Gravitino API.
+ *   <li><b>View</b>: Must be created with a SELECT query (CREATE VIEW ... AS 
SELECT); this is not a
+ *       standard table ENGINE and cannot be created via Gravitino's CREATE 
TABLE API.
+ *   <li><b>KeeperMap</b>: Requires a ZooKeeper/ClickHouse-Keeper path 
parameter in the ENGINE
+ *       clause; not available in a single-node setup without Keeper.
+ *   <li><b>File</b>: Requires a file format parameter and specific 
server-side storage
+ *       configuration; not directly creatable via Gravitino's CREATE TABLE 
API.
+ * </ul>
+ */
+@Tag("gravitino-docker-test")
+@TestInstance(Lifecycle.PER_CLASS)
+public class TestClickHouseEngineIT {
+
+  private static final ContainerSuite containerSuite = 
ContainerSuite.getInstance();
+  private static final TestDatabaseName TEST_DB_NAME = 
TestDatabaseName.CLICKHOUSE_ENGINE_IT;
+
+  private ClickHouseContainer clickHouseContainer;
+  private ClickHouseDatabaseOperations databaseOperations;
+  private ClickHouseTableOperations tableOperations;
+  private String schemaName;
+
+  @BeforeAll
+  public void setup() throws Exception {
+    containerSuite.startClickHouseContainer(TEST_DB_NAME);
+    clickHouseContainer = containerSuite.getClickHouseContainer();
+
+    Map<String, String> catalogProperties = Maps.newHashMap();
+    catalogProperties.put(JdbcConfig.JDBC_URL.getKey(), 
clickHouseContainer.getJdbcUrl());
+    catalogProperties.put(
+        JdbcConfig.JDBC_DRIVER.getKey(), 
clickHouseContainer.getDriverClassName(TEST_DB_NAME));
+    catalogProperties.put(JdbcConfig.USERNAME.getKey(), 
clickHouseContainer.getUsername());
+    catalogProperties.put(JdbcConfig.PASSWORD.getKey(), 
clickHouseContainer.getPassword());
+
+    DataSource dataSource = 
DataSourceUtils.createDataSource(catalogProperties);
+
+    ClickHouseExceptionConverter exceptionConverter = new 
ClickHouseExceptionConverter();
+    databaseOperations = new ClickHouseDatabaseOperations();
+    databaseOperations.initialize(dataSource, exceptionConverter, 
Collections.emptyMap());
+
+    tableOperations = new ClickHouseTableOperations();
+    tableOperations.initialize(
+        dataSource,
+        exceptionConverter,
+        new ClickHouseTypeConverter(),
+        new ClickHouseColumnDefaultValueConverter(),
+        Collections.emptyMap());
+
+    schemaName = "engine_it_" + 
RandomStringUtils.randomAlphanumeric(6).toLowerCase();
+    databaseOperations.create(schemaName, null, Collections.emptyMap());
+  }
+
+  @AfterAll
+  public void cleanup() {
+    if (databaseOperations != null && schemaName != null) {
+      databaseOperations.delete(schemaName, true);
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Log-family engines (TinyLog / StripeLog / Log)
+  // 
---------------------------------------------------------------------------
+
+  @Test
+  void testTinyLogEngineCreateLoadAndPersistAfterRestart() throws Exception {
+    String tableName = "tinylog_" + 
RandomStringUtils.randomAlphanumeric(6).toLowerCase();
+    createSimpleTable(schemaName, tableName, ENGINE.TINYLOG);
+
+    JdbcTable before = tableOperations.load(schemaName, tableName);
+    Assertions.assertEquals(
+        ENGINE.TINYLOG.getValue(), 
before.properties().get(GRAVITINO_ENGINE_KEY));
+
+    insertRow(tableName, 1);
+    Assertions.assertEquals(1, countRows(tableName));
+
+    restartClickHouseAndWait();
+
+    JdbcTable after = tableOperations.load(schemaName, tableName);
+    Assertions.assertEquals(
+        ENGINE.TINYLOG.getValue(), 
after.properties().get(GRAVITINO_ENGINE_KEY));
+    // TinyLog persists data on disk — row must still be there
+    Assertions.assertEquals(1, countRows(tableName));
+  }
+
+  @Test
+  void testStripeLogEngineCreateLoadAndPersistAfterRestart() throws Exception {
+    String tableName = "stripelog_" + 
RandomStringUtils.randomAlphanumeric(6).toLowerCase();
+    createSimpleTable(schemaName, tableName, ENGINE.STRIPELOG);
+
+    JdbcTable before = tableOperations.load(schemaName, tableName);
+    Assertions.assertEquals(
+        ENGINE.STRIPELOG.getValue(), 
before.properties().get(GRAVITINO_ENGINE_KEY));
+
+    insertRow(tableName, 2);
+    Assertions.assertEquals(1, countRows(tableName));
+
+    restartClickHouseAndWait();
+
+    JdbcTable after = tableOperations.load(schemaName, tableName);
+    Assertions.assertEquals(
+        ENGINE.STRIPELOG.getValue(), 
after.properties().get(GRAVITINO_ENGINE_KEY));
+    // StripeLog persists data on disk — row must still be there
+    Assertions.assertEquals(1, countRows(tableName));
+  }
+
+  @Test
+  void testLogEngineCreateLoadAndPersistAfterRestart() throws Exception {
+    String tableName = "log_" + 
RandomStringUtils.randomAlphanumeric(6).toLowerCase();
+    createSimpleTable(schemaName, tableName, ENGINE.LOG);
+
+    JdbcTable before = tableOperations.load(schemaName, tableName);
+    Assertions.assertEquals(ENGINE.LOG.getValue(), 
before.properties().get(GRAVITINO_ENGINE_KEY));
+
+    insertRow(tableName, 3);
+    Assertions.assertEquals(1, countRows(tableName));
+
+    restartClickHouseAndWait();
+
+    JdbcTable after = tableOperations.load(schemaName, tableName);
+    Assertions.assertEquals(ENGINE.LOG.getValue(), 
after.properties().get(GRAVITINO_ENGINE_KEY));
+    // Log persists data on disk — row must still be there
+    Assertions.assertEquals(1, countRows(tableName));
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Memory engine (volatile: data is lost on restart)
+  // 
---------------------------------------------------------------------------
+
+  @Test
+  void testMemoryEngineIsVolatileAfterRestart() throws Exception {
+    String tableName = "memory_" + 
RandomStringUtils.randomAlphanumeric(6).toLowerCase();
+    createSimpleTable(schemaName, tableName, ENGINE.MEMORY);
+
+    JdbcTable before = tableOperations.load(schemaName, tableName);
+    Assertions.assertEquals(
+        ENGINE.MEMORY.getValue(), 
before.properties().get(GRAVITINO_ENGINE_KEY));
+
+    insertRow(tableName, 10);
+    Assertions.assertEquals(1, countRows(tableName));
+
+    restartClickHouseAndWait();
+
+    // Table DEFINITION persists — loadTable must still succeed
+    JdbcTable after = tableOperations.load(schemaName, tableName);
+    Assertions.assertEquals(ENGINE.MEMORY.getValue(), 
after.properties().get(GRAVITINO_ENGINE_KEY));
+
+    // But DATA is volatile — must be empty after restart
+    Assertions.assertEquals(
+        0,
+        countRows(tableName),
+        "Memory engine data must be empty after ClickHouse restart (volatile 
engine)");
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Null engine (data is always discarded)
+  // 
---------------------------------------------------------------------------
+
+  @Test
+  void testNullEngineCreateLoadAndPersistAfterRestart() throws Exception {
+    String tableName = "null_" + 
RandomStringUtils.randomAlphanumeric(6).toLowerCase();
+    createSimpleTable(schemaName, tableName, ENGINE.NULL);
+
+    JdbcTable before = tableOperations.load(schemaName, tableName);
+    Assertions.assertEquals(ENGINE.NULL.getValue(), 
before.properties().get(GRAVITINO_ENGINE_KEY));
+
+    restartClickHouseAndWait();
+
+    JdbcTable after = tableOperations.load(schemaName, tableName);
+    Assertions.assertEquals(ENGINE.NULL.getValue(), 
after.properties().get(GRAVITINO_ENGINE_KEY));
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Set engine
+  // 
---------------------------------------------------------------------------
+
+  @Test
+  void testSetEngineCreateLoadAndPersistAfterRestart() throws Exception {
+    String tableName = "set_" + 
RandomStringUtils.randomAlphanumeric(6).toLowerCase();
+    createSimpleTable(schemaName, tableName, ENGINE.SET);
+
+    JdbcTable before = tableOperations.load(schemaName, tableName);
+    Assertions.assertEquals(ENGINE.SET.getValue(), 
before.properties().get(GRAVITINO_ENGINE_KEY));
+
+    restartClickHouseAndWait();
+
+    JdbcTable after = tableOperations.load(schemaName, tableName);
+    Assertions.assertEquals(ENGINE.SET.getValue(), 
after.properties().get(GRAVITINO_ENGINE_KEY));
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Join engine — NOT directly creatable via Gravitino's CREATE TABLE API.
+  // ClickHouse requires: ENGINE = Join(ANY|ALL|SEMI|ANTI, LEFT|INNER|RIGHT, 
keys...)
+  // Gravitino passes only the bare engine name, which is rejected by 
ClickHouse:
+  //   "Storage Join requires at least 3 parameters"
+  // This is documented in the class Javadoc and in 
docs/jdbc-clickhouse-catalog.md.
+  // 
---------------------------------------------------------------------------
+
+  // 
---------------------------------------------------------------------------
+  // Helpers
+  // 
---------------------------------------------------------------------------
+
+  /** Creates a minimal two-column table with the given engine (no ORDER BY, 
no indexes). */
+  private void createSimpleTable(String dbName, String tableName, ENGINE 
engine) {
+    JdbcColumn[] columns = {
+      JdbcColumn.builder()
+          .withName("id")
+          .withType(Types.IntegerType.get())
+          .withNullable(false)
+          .build(),
+      JdbcColumn.builder()
+          .withName("name")
+          .withType(Types.StringType.get())
+          .withNullable(true)
+          .build()
+    };
+    Map<String, String> props = new HashMap<>();
+    props.put(GRAVITINO_ENGINE_KEY, engine.getValue());
+
+    tableOperations.create(
+        dbName,
+        tableName,
+        columns,
+        "engine test table",
+        props,
+        null,
+        Distributions.NONE,
+        new Index[0],
+        null);
+  }
+
+  private void insertRow(String tableName, int id) throws Exception {
+    try (Connection conn =
+            DriverManager.getConnection(
+                clickHouseContainer.getJdbcUrl(TEST_DB_NAME),
+                clickHouseContainer.getUsername(),
+                clickHouseContainer.getPassword());
+        Statement stmt = conn.createStatement()) {
+      stmt.execute(
+          String.format(
+              "INSERT INTO `%s`.`%s` (id, name) VALUES (%d, 'test')", 
schemaName, tableName, id));
+    }
+  }
+
+  private int countRows(String tableName) throws Exception {
+    try (Connection conn =
+            DriverManager.getConnection(
+                clickHouseContainer.getJdbcUrl(TEST_DB_NAME),
+                clickHouseContainer.getUsername(),
+                clickHouseContainer.getPassword());
+        Statement stmt = conn.createStatement();
+        ResultSet rs =
+            stmt.executeQuery(
+                String.format("SELECT count() FROM `%s`.`%s`", schemaName, 
tableName))) {
+      return rs.next() ? rs.getInt(1) : 0;
+    }
+  }
+
+  /**
+   * Restarts the ClickHouse Docker container and waits until the server is 
ready again. After
+   * restart the container keeps the same internal IP and mapped ports, so the 
existing DataSource
+   * reconnects automatically via HikariCP's stale-connection detection.
+   */
+  private void restartClickHouseAndWait() throws Exception {
+    String containerId = clickHouseContainer.getContainer().getContainerId();
+    
DockerClientFactory.instance().client().restartContainerCmd(containerId).exec();
+
+    String jdbcUrl = clickHouseContainer.getJdbcUrl();
+    String user = clickHouseContainer.getUsername();
+    String password = clickHouseContainer.getPassword();
+
+    Awaitility.await()
+        .atMost(Duration.ofMinutes(2))
+        .pollInterval(Duration.ofSeconds(3))
+        .ignoreExceptions()
+        .until(
+            () -> {
+              try (Connection conn = DriverManager.getConnection(jdbcUrl, 
user, password);
+                  Statement stmt = conn.createStatement()) {
+                stmt.execute("SELECT 1");
+                return true;
+              }
+            });
+  }
+}
diff --git 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperations.java
 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperations.java
index 753b4110f1..a7cc763e30 100644
--- 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperations.java
+++ 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperations.java
@@ -41,7 +41,6 @@ import 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseExceptionConv
 import 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter;
 import org.apache.gravitino.catalog.jdbc.JdbcColumn;
 import org.apache.gravitino.catalog.jdbc.JdbcTable;
-import org.apache.gravitino.exceptions.GravitinoRuntimeException;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.TableChange;
 import org.apache.gravitino.rel.expressions.NamedReference;
@@ -205,11 +204,10 @@ public class TestClickHouseTableOperations extends 
TestClickHouse {
     Assertions.assertTrue(
         TABLE_OPERATIONS.drop(TEST_DB_NAME.toString(), newName), "table should 
be dropped");
 
-    GravitinoRuntimeException exception =
-        Assertions.assertThrows(
-            GravitinoRuntimeException.class,
-            () -> TABLE_OPERATIONS.drop(TEST_DB_NAME.toString(), newName));
-    Assertions.assertTrue(StringUtils.contains(exception.getMessage(), "does 
not exist"));
+    // Dropping a table that no longer exists should return false, not throw.
+    Assertions.assertFalse(
+        TABLE_OPERATIONS.drop(TEST_DB_NAME.toString(), newName),
+        "dropping non-existent table should return false");
   }
 
   @Test
@@ -1058,6 +1056,43 @@ public class TestClickHouseTableOperations extends 
TestClickHouse {
     Assertions.assertTrue(sql.contains("INDEX `idx_c3` `c3` TYPE bloom_filter 
GRANULARITY 3"));
   }
 
+  @Test
+  void testGenerateCreateTableSqlWithAutoIncrementColumnUnsupported() {
+    TestableClickHouseTableOperations ops = new 
TestableClickHouseTableOperations();
+    ops.initialize(
+        null,
+        new ClickHouseExceptionConverter(),
+        new ClickHouseTypeConverter(),
+        new ClickHouseColumnDefaultValueConverter(),
+        new HashMap<>());
+
+    JdbcColumn[] cols =
+        new JdbcColumn[] {
+          JdbcColumn.builder()
+              .withName("id")
+              .withType(Types.IntegerType.get())
+              .withNullable(false)
+              .withAutoIncrement(true)
+              .withDefaultValue(DEFAULT_VALUE_NOT_SET)
+              .build()
+        };
+
+    UnsupportedOperationException exception =
+        Assertions.assertThrows(
+            UnsupportedOperationException.class,
+            () ->
+                ops.buildCreateSql(
+                    "t_auto_inc",
+                    cols,
+                    null,
+                    new HashMap<>(),
+                    Transforms.EMPTY_TRANSFORM,
+                    Distributions.NONE,
+                    Indexes.EMPTY_INDEXES,
+                    ClickHouseUtils.getSortOrders("id")));
+    Assertions.assertTrue(exception.getMessage().contains("auto increment"));
+  }
+
   @Test
   void testParsePartitioningAndIndexExpressions() {
     TestableClickHouseTableOperations ops = new 
TestableClickHouseTableOperations();
@@ -1076,6 +1111,28 @@ public class TestClickHouseTableOperations extends 
TestClickHouse {
     Assertions.assertArrayEquals(new String[][] {{"c4"}}, bloomFields);
   }
 
+  @Test
+  void testParseSortOrdersFromMultilineShowCreateSql() {
+    TestableClickHouseTableOperations ops = new 
TestableClickHouseTableOperations();
+    String showCreateSql =
+        """
+        CREATE TABLE `t1`
+        (
+          `id` Int32,
+          `event_time` DateTime
+        )
+        ENGINE = MergeTree
+        ORDER BY
+          (`id`, toDate(`event_time`))
+        SETTINGS index_granularity = 8192
+        """;
+
+    SortOrder[] sortOrders = ops.parseSortOrders(showCreateSql);
+    Assertions.assertEquals(2, sortOrders.length);
+    Assertions.assertTrue(sortOrders[0].expression() instanceof 
NamedReference);
+    Assertions.assertEquals("id", ((NamedReference) 
sortOrders[0].expression()).fieldName()[0]);
+  }
+
   private static final class TestableClickHouseTableOperations extends 
ClickHouseTableOperations {
     String buildCreateSql(
         String tableName,
@@ -1089,6 +1146,10 @@ public class TestClickHouseTableOperations extends 
TestClickHouse {
       return generateCreateTableSql(
           tableName, columns, comment, properties, partitioning, distribution, 
indexes, sortOrders);
     }
+
+    SortOrder[] parseSortOrders(String createSql) {
+      return parseSortOrdersFromCreateSql(createSql);
+    }
   }
 
   @Test
@@ -1121,6 +1182,8 @@ public class TestClickHouseTableOperations extends 
TestClickHouse {
           TableChange.updateColumnPosition(new String[] {"c1"}, 
TableChange.ColumnPosition.first()),
           TableChange.deleteColumn(new String[] {"c3"}, false),
           TableChange.updateColumnNullability(new String[] {"c2"}, false),
+          TableChange.addIndex(
+              Index.IndexType.DATA_SKIPPING_MINMAX, "idx2", new String[][] 
{{"c2"}}),
           TableChange.deleteIndex("idx1", false),
           TableChange.renameColumn(new String[] {"c2"}, "c2_new"),
           TableChange.updateComment("new_table_comment")
@@ -1135,6 +1198,7 @@ public class TestClickHouseTableOperations extends 
TestClickHouse {
     Assertions.assertTrue(sql.contains("COMMENT 'c1_comment'"));
     Assertions.assertTrue(sql.contains("FIRST"));
     Assertions.assertTrue(sql.contains("DROP COLUMN `c3`"));
+    Assertions.assertTrue(sql.contains("ADD INDEX `idx2` `c2` TYPE minmax 
GRANULARITY 1"));
     Assertions.assertTrue(sql.contains("DROP INDEX `idx1`"));
     Assertions.assertTrue(sql.contains("MODIFY COMMENT 'new_table_comment'"));
     Assertions.assertTrue(sql.startsWith("ALTER TABLE `tbl`"));
@@ -1182,6 +1246,61 @@ public class TestClickHouseTableOperations extends 
TestClickHouse {
                 "db", "tbl", new TableChange[] 
{TableChange.deleteIndex("missing", false)}));
   }
 
+  @Test
+  public void testAlterTableAddIndexBranches() {
+    StubClickHouseTableOperations ops = new StubClickHouseTableOperations();
+    ops.initialize(
+        null,
+        new ClickHouseExceptionConverter(),
+        new ClickHouseTypeConverter(),
+        new ClickHouseColumnDefaultValueConverter(),
+        new HashMap<>());
+    ops.setTable(buildStubTable());
+
+    String minMaxSql =
+        ops.buildAlterSql(
+            "db",
+            "tbl",
+            new TableChange[] {
+              TableChange.addIndex(
+                  Index.IndexType.DATA_SKIPPING_MINMAX, "idx_new", new 
String[][] {{"c2"}})
+            });
+    Assertions.assertTrue(minMaxSql.contains("ADD INDEX `idx_new` `c2` TYPE 
minmax GRANULARITY 1"));
+
+    String bloomSql =
+        ops.buildAlterSql(
+            "db",
+            "tbl",
+            new TableChange[] {
+              TableChange.addIndex(
+                  Index.IndexType.DATA_SKIPPING_BLOOM_FILTER, "idx_bf", new 
String[][] {{"c2"}})
+            });
+    Assertions.assertTrue(
+        bloomSql.contains("ADD INDEX `idx_bf` `c2` TYPE bloom_filter 
GRANULARITY 3"));
+
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            ops.buildAlterSql(
+                "db",
+                "tbl",
+                new TableChange[] {
+                  TableChange.addIndex(
+                      Index.IndexType.DATA_SKIPPING_MINMAX, "idx1", new 
String[][] {{"c2"}})
+                }));
+
+    Assertions.assertThrows(
+        UnsupportedOperationException.class,
+        () ->
+            ops.buildAlterSql(
+                "db",
+                "tbl",
+                new TableChange[] {
+                  TableChange.addIndex(
+                      Index.IndexType.PRIMARY_KEY, "pk_new", new String[][] 
{{"c1"}})
+                }));
+  }
+
   @Test
   public void testAlterTableNullabilityValidationFails() {
     StubClickHouseTableOperations ops = new StubClickHouseTableOperations();
diff --git 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperationsCluster.java
 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperationsCluster.java
index 0f521bcbbb..f231b9d4d3 100644
--- 
a/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperationsCluster.java
+++ 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/operations/TestClickHouseTableOperationsCluster.java
@@ -19,6 +19,9 @@
 package org.apache.gravitino.catalog.clickhouse.operations;
 
 import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.GRAVITINO_ENGINE_KEY;
+import static 
org.apache.gravitino.catalog.clickhouse.operations.ClickHouseClusterUtils.CLUSTER_META_PREFIX;
+import static 
org.apache.gravitino.catalog.clickhouse.operations.ClickHouseClusterUtils.extractClusterFromComment;
+import static 
org.apache.gravitino.catalog.clickhouse.operations.ClickHouseClusterUtils.stripClusterMetadata;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -28,9 +31,11 @@ import 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseColumnDefault
 import 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseExceptionConverter;
 import 
org.apache.gravitino.catalog.clickhouse.converter.ClickHouseTypeConverter;
 import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.rel.expressions.NamedReference;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
 import org.apache.gravitino.rel.expressions.distributions.Distributions;
 import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.sorts.SortOrders;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.indexes.Index;
 import org.apache.gravitino.rel.types.Types;
@@ -219,6 +224,142 @@ class TestClickHouseTableOperationsCluster {
         sql.contains("ENGINE = 
Distributed(`ck_cluster`,`remote_db`,`remote_table`,`user_id`)"));
   }
 
+  @Test
+  void testGenerateDropTableSqlWithoutCluster() {
+    Map<String, String> props = new HashMap<>();
+    props.put(ClusterConstants.ON_CLUSTER, "false");
+
+    String sql = ops.buildDropSql("orders", props);
+    Assertions.assertEquals("DROP TABLE `orders`", sql);
+  }
+
+  @Test
+  void testGenerateDropTableSqlWithNullProperties() {
+    String sql = ops.buildDropSql("orders", null);
+    Assertions.assertEquals("DROP TABLE `orders`", sql);
+  }
+
+  @Test
+  void testGenerateDropTableSqlWithCluster() {
+    Map<String, String> props = new HashMap<>();
+    props.put(ClusterConstants.ON_CLUSTER, "true");
+    props.put(ClusterConstants.CLUSTER_NAME, "ck_cluster");
+
+    String sql = ops.buildDropSql("orders", props);
+    Assertions.assertEquals("DROP TABLE `orders` ON CLUSTER `ck_cluster` 
SYNC", sql);
+  }
+
+  @Test
+  void testGenerateDropTableSqlOnClusterWithoutClusterName() {
+    // on-cluster=true but no cluster-name → fall back to plain DROP TABLE
+    Map<String, String> props = new HashMap<>();
+    props.put(ClusterConstants.ON_CLUSTER, "true");
+
+    String sql = ops.buildDropSql("orders", props);
+    Assertions.assertEquals("DROP TABLE `orders`", sql);
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Cluster metadata embedded in COMMENT
+  // 
---------------------------------------------------------------------------
+
+  /**
+   * When a MergeTree table is created ON CLUSTER, the cluster name must be 
embedded in the stored
+   * COMMENT so it can be recovered at DROP/load time (SHOW CREATE TABLE omits 
ON CLUSTER).
+   */
+  @Test
+  void testCreateTableOnClusterEmbedsClusterNameInComment() {
+    JdbcColumn[] columns =
+        new JdbcColumn[] {
+          JdbcColumn.builder()
+              .withName("id")
+              .withType(Types.IntegerType.get())
+              .withNullable(false)
+              .build()
+        };
+
+    Map<String, String> props = new HashMap<>();
+    props.put(ClusterConstants.CLUSTER_NAME, "ck_cluster");
+    props.put(ClusterConstants.ON_CLUSTER, "true");
+    props.put(GRAVITINO_ENGINE_KEY, "MergeTree");
+
+    String sql =
+        ops.buildCreateSql(
+            "tbl",
+            columns,
+            "user comment",
+            props,
+            null,
+            Distributions.NONE,
+            new Index[0],
+            new SortOrder[] 
{SortOrders.ascending(NamedReference.field("id"))});
+
+    // CREATE TABLE clause must have ON CLUSTER
+    Assertions.assertTrue(
+        sql.contains("ON CLUSTER `ck_cluster`"),
+        "SQL must contain ON CLUSTER `ck_cluster`; got: " + sql);
+
+    // The COMMENT clause must contain the embedded cluster metadata
+    String expectedPrefix = CLUSTER_META_PREFIX + "ck_cluster";
+    Assertions.assertTrue(
+        sql.contains(expectedPrefix),
+        "SQL COMMENT must contain cluster metadata '" + expectedPrefix + "'; 
got: " + sql);
+
+    // User comment must be preserved before the separator
+    Assertions.assertTrue(
+        sql.contains("user comment"),
+        "SQL COMMENT must still contain the original user comment; got: " + 
sql);
+  }
+
+  /** Non-cluster table must NOT embed any cluster metadata in the COMMENT. */
+  @Test
+  void testCreateTableWithoutClusterDoesNotEmbedClusterMetadata() {
+    JdbcColumn[] columns =
+        new JdbcColumn[] {
+          JdbcColumn.builder()
+              .withName("id")
+              .withType(Types.IntegerType.get())
+              .withNullable(false)
+              .build()
+        };
+
+    Map<String, String> props = new HashMap<>();
+    props.put(GRAVITINO_ENGINE_KEY, "MergeTree");
+
+    String sql =
+        ops.buildCreateSql(
+            "tbl",
+            columns,
+            "plain comment",
+            props,
+            null,
+            Distributions.NONE,
+            new Index[0],
+            new SortOrder[] 
{SortOrders.ascending(NamedReference.field("id"))});
+
+    Assertions.assertFalse(
+        sql.contains(ClickHouseClusterUtils.CLUSTER_META_PREFIX),
+        "Non-cluster table must not embed cluster metadata; got: " + sql);
+    Assertions.assertTrue(
+        sql.contains("plain comment"), "User comment must be present 
unmodified; got: " + sql);
+  }
+
+  /** Cluster metadata round-trip: embed → extract → strip. */
+  @Test
+  void testClusterMetadataRoundTrip() {
+    String stored = ClickHouseClusterUtils.embedClusterInComment("my comment", 
"ck_cluster");
+    Assertions.assertEquals("ck_cluster", extractClusterFromComment(stored));
+    Assertions.assertEquals("my comment", stripClusterMetadata(stored));
+  }
+
+  /** When no user comment is provided, only the cluster metadata token is 
stored. */
+  @Test
+  void testClusterMetadataRoundTripNullComment() {
+    String stored = ClickHouseClusterUtils.embedClusterInComment(null, 
"ck_cluster");
+    Assertions.assertEquals("ck_cluster", extractClusterFromComment(stored));
+    Assertions.assertEquals("", stripClusterMetadata(stored));
+  }
+
   private static class TestableClickHouseTableOperations extends 
ClickHouseTableOperations {
     String buildCreateSql(
         String tableName,
@@ -232,5 +373,9 @@ class TestClickHouseTableOperationsCluster {
       return generateCreateTableSql(
           tableName, columns, comment, properties, partitioning, distribution, 
indexes, sortOrders);
     }
+
+    String buildDropSql(String tableName, Map<String, String> properties) {
+      return generateDropTableSql(tableName, properties);
+    }
   }
 }
diff --git 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcCatalogOperations.java
 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcCatalogOperations.java
index e9716749fa..e7d4fde6a5 100644
--- 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcCatalogOperations.java
+++ 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcCatalogOperations.java
@@ -524,6 +524,8 @@ public class JdbcCatalogOperations implements 
CatalogOperations, SupportsSchemas
         .withComment(comment)
         
.withProperties(jdbcTablePropertiesMetadata.convertFromJdbcProperties(resultProperties))
         .withPartitioning(partitioning)
+        .withSortOrders(sortOrders)
+        .withDistribution(distribution)
         .withIndexes(indexes)
         .withDatabaseName(databaseName)
         .withTableOperation(tableOperation)
diff --git a/docs/jdbc-clickhouse-catalog.md b/docs/jdbc-clickhouse-catalog.md
index d50a952353..7be0533219 100644
--- a/docs/jdbc-clickhouse-catalog.md
+++ b/docs/jdbc-clickhouse-catalog.md
@@ -120,6 +120,10 @@ See [Manage Relational Metadata Using 
Gravitino](./manage-relational-metadata-us
 | `on-cluster`   | Use `ON CLUSTER` when creating the database                 
                       | `false`       | No       | No        | 1.2.0         |
 | `cluster-name` | Cluster name used with `ON CLUSTER` (must align with 
table-level cluster settings) | (none)        | No       | No        | 1.2.0    
     |
 
+:::warning
+**Cluster properties only reflect Gravitino-managed schemas.** Gravitino 
embeds the cluster name inside the schema's `COMMENT` field at creation time 
(because `SHOW CREATE DATABASE` does not include `ON CLUSTER` for standard 
Atomic databases). Schemas created outside Gravitino will not have this 
metadata, so `on-cluster` and `cluster-name` will be absent when loaded, and 
`DROP SCHEMA` will not propagate `ON CLUSTER` to other cluster nodes.
+:::
+
 ### Create a schema
 
 <Tabs groupId="language" queryString>
@@ -156,15 +160,15 @@ See [Manage Relational Metadata Using 
Gravitino](./manage-relational-metadata-us
 
 ### Table capabilities
 
-| Area                | Details                                                
                                                                                
                                                                                
                                                                                
                                                                  |
-|---------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| Mapping             | Gravitino table maps to a ClickHouse table             
                                                                                
                                                                                
                                                                                
                                                                  |
-| Engines             | Local engines: MergeTree family (`MergeTree` default, 
`ReplacingMergeTree`, `SummingMergeTree`, `AggregatingMergeTree`, 
`CollapsingMergeTree`, `VersionedCollapsingMergeTree`, `GraphiteMergeTree`), 
Tiny/Stripe/Log, Memory, File, Null, Set, Join, View, Buffer, KeeperMap, etc. 
Distributed engine supports cluster mode with remote database/table and 
sharding key. |
-| Ordering/Partition  | MergeTree-family requires exactly one `ORDER BY` 
column; only single-column identity `PARTITION BY` is supported on MergeTree 
engines. Other engines reject `ORDER BY`/`PARTITION BY`.                        
                                                                                
                                                                           |
-| Indexes             | Primary key; data-skipping indexes 
`DATA_SKIPPING_MINMAX` and `DATA_SKIPPING_BLOOM_FILTER` (fixed granularities).  
                                                                                
                                                                                
                                                                                
      |
-| Distribution        | Gravitino enforces `Distributions.NONE`; no custom 
distribution strategies.                                                        
                                                                                
                                                                                
                                                                      |
-| Column defaults     | Supported.                                             
                                                                                
                                                                                
                                                                                
                                                                  |
-| Unsupported         | Engine change after creation; removing table 
properties; auto-increment columns.                                             
                                                                                
                                                                                
                                                                            |
+| Area                | Details                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
   |
+|---------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Mapping             | Gravitino table maps to a ClickHouse table             
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
   |
+| Engines             | **MergeTree family** (`MergeTree` default, 
`ReplacingMergeTree`, `SummingMergeTree`, `AggregatingMergeTree`, 
`CollapsingMergeTree`, `VersionedCollapsingMergeTree`, `GraphiteMergeTree`): 
fully supported, data persists across restarts. **Log family** (`TinyLog`, 
`StripeLog`, `Log`): supported, data and table definition persist across 
restarts. **`Null`**: supported, table persists, data is always discarded by 
design. **`Set`**: supported, table definition persists.  [...]
+| Ordering/Partition  | MergeTree-family requires exactly one `ORDER BY` 
column; only single-column identity `PARTITION BY` is supported on MergeTree 
engines. Other engines reject `ORDER BY`/`PARTITION BY`.                        
                                                                                
                                                                                
                                                                                
            |
+| Indexes             | Primary key; data-skipping indexes 
`DATA_SKIPPING_MINMAX` and `DATA_SKIPPING_BLOOM_FILTER` (fixed granularities).  
                                                                                
                                                                                
                                                                                
                                                                                
                       |
+| Distribution        | Gravitino enforces `Distributions.NONE`; no custom 
distribution strategies.                                                        
                                                                                
                                                                                
                                                                                
                                                                                
       |
+| Column defaults     | Supported.                                             
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
   |
+| Unsupported         | Engine change after creation; removing table 
properties; auto-increment columns.                                             
                                                                                
                                                                                
                                                                                
                                                                                
             |
 
 ### Table column types
 
@@ -193,9 +197,23 @@ Other ClickHouse types are exposed as [External 
Type](./manage-relational-metada
 ### Table properties
 
 :::note
-- `settings.*` keys are passed to the ClickHouse `SETTINGS` clause verbatim.  
+- `settings.*` keys are passed to the ClickHouse `SETTINGS` clause verbatim.
 - The `engine` value is immutable after creation.
-- When loading table metadata, Gravitino cannot determine whether it is a 
cluster table or a local table, because properties such as `cluster-name` and 
`on-cluster` are not available from the JDBC metadata.
+:::
+
+:::warning
+**Cluster properties only reflect Gravitino-managed objects.**
+ClickHouse does not persist `ON CLUSTER` information in `SHOW CREATE TABLE` or 
`SHOW CREATE DATABASE` output for non-Replicated objects. Gravitino works 
around this by embedding the cluster name in the object's `COMMENT` field at 
creation time and reading it back on load/drop.
+
+This means:
+- **Gravitino-created databases and tables**: `on-cluster` and `cluster-name` 
properties are accurate.
+- **Databases or tables created outside Gravitino** (e.g., via ClickHouse 
client, migration scripts, or other tools): `on-cluster` will be `false` and 
`cluster-name` will be absent, regardless of whether the object was actually 
created `ON CLUSTER`. Subsequent `DROP DATABASE` / `DROP TABLE` operations 
performed through Gravitino will **not** include `ON CLUSTER`, which may leave 
orphan objects on non-coordinating cluster nodes.
+
+If you need Gravitino to manage an existing cluster database or table, 
recreate it through the Gravitino API so the cluster metadata is properly 
embedded.
+:::
+
+:::warning
+**Memory engine data volatility**: Tables created with `engine=Memory` store 
data in RAM only. After a ClickHouse server restart the table definition 
persists (Gravitino's `loadTable` succeeds), but all data is permanently lost. 
Gravitino metadata and ClickHouse remain consistent at the schema level, but 
users are responsible for repopulating data after restarts. Consider using 
`TinyLog`, `StripeLog`, or a MergeTree-family engine if data durability is 
required.
 :::
 
 | Property Name              | Description                                     
                                                         | Default Value | 
Required | Reserved | Immutable | Since version |
@@ -322,7 +340,7 @@ Supported:
 - Rename column.
 - Update column type/comment/default/position/nullability.
 - Delete columns (with `IF EXISTS` support).
-- Add primary or data-skipping indexes; drop data-skipping indexes.
+- Add data-skipping indexes; drop data-skipping indexes. Adding/dropping 
primary key is not supported.
 - Update table comment.
 
 Unsupported:
diff --git 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/TestDatabaseName.java
 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/TestDatabaseName.java
index 06ecb30154..610bb6730c 100644
--- 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/TestDatabaseName.java
+++ 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/TestDatabaseName.java
@@ -120,6 +120,7 @@ public enum TestDatabaseName {
   CLICKHOUSE_CATALOG_CLICKHOUSE_IT,
   CLICKHOUSE_AUDIT_CATALOG_CLICKHOUSE_IT,
   CLICKHOUSE_CLUSTER_CLICKHOUSE_IT,
+  CLICKHOUSE_ENGINE_IT,
 
   /** Represents the PostgreSQL database for partition statistics integration 
tests. */
   PG_TEST_PARTITION_STATS {

Reply via email to