yuqi1129 commented on code in PR #10068:
URL: https://github.com/apache/gravitino/pull/10068#discussion_r2894974883


##########
catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java:
##########
@@ -34,17 +68,123 @@
  * <p>Hologres is PostgreSQL-compatible, so most table operations follow 
PostgreSQL conventions.
  * However, Hologres has specific features like table properties (orientation, 
distribution_key,
  * etc.) that are handled through the WITH clause in CREATE TABLE statements.
- *
- * <p>TODO: Full implementation will be added in a follow-up PR.
  */
 public class HologresTableOperations extends JdbcTableOperations
     implements RequireDatabaseOperation {
 
-  public static final String HOLO_QUOTE = "\"";
+  public static final String NEW_LINE = "\n";
+  public static final String ALTER_TABLE = "ALTER TABLE ";
+  public static final String ALTER_COLUMN = "ALTER COLUMN ";
+  public static final String IS = " IS '";
+  public static final String COLUMN_COMMENT = "COMMENT ON COLUMN ";
+  public static final String TABLE_COMMENT = "COMMENT ON TABLE ";
+
+  private static final String HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG =
+      "Hologres does not support nested column names.";
+
+  /** Properties that are handled separately or read-only, excluded from the 
WITH clause. */
+  private static final Set<String> EXCLUDED_TABLE_PROPERTIES =
+      ImmutableSet.of("distribution_key", "is_logical_partitioned_table", 
"primary_key");
+
+  /** Properties that are meaningful for users, filtering out internal system 
properties. */
+  private static final Set<String> USER_RELEVANT_PROPERTIES =
+      ImmutableSet.of(
+          "orientation",
+          "clustering_key",
+          "segment_key",
+          "bitmap_columns",
+          "dictionary_encoding_columns",
+          "time_to_live_in_seconds",
+          "table_group",
+          "storage_format",
+          "binlog.level",
+          "binlog.ttl",
+          "is_logical_partitioned_table",
+          "partition_expiration_time",
+          "partition_keep_hot_window",
+          "partition_require_filter",
+          "partition_generate_binlog_window");
+
+  private String database;
+  private HologresSchemaOperations schemaOperations;
+
+  @Override
+  protected String quoteIdentifier(String identifier) {
+    return "\"" + identifier + "\"";

Review Comment:
   **Bug (Medium):** `quoteIdentifier` 没有对标识符中可能包含的双引号进行转义。如果 identifier 中包含 
`"` 字符,生成的 SQL 会被截断或语法错误。
   
   PostgreSQL 标准的做法是把内部双引号翻倍:
   ```java
   return "\"" + identifier.replace("\"", "\"\"") + "\"";
   ```
   
   虽然实际场景中表名/列名很少包含双引号,但作为防御性编程建议加上。



##########
catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java:
##########
@@ -34,17 +68,123 @@
  * <p>Hologres is PostgreSQL-compatible, so most table operations follow 
PostgreSQL conventions.
  * However, Hologres has specific features like table properties (orientation, 
distribution_key,
  * etc.) that are handled through the WITH clause in CREATE TABLE statements.
- *
- * <p>TODO: Full implementation will be added in a follow-up PR.
  */
 public class HologresTableOperations extends JdbcTableOperations
     implements RequireDatabaseOperation {
 
-  public static final String HOLO_QUOTE = "\"";
+  public static final String NEW_LINE = "\n";
+  public static final String ALTER_TABLE = "ALTER TABLE ";
+  public static final String ALTER_COLUMN = "ALTER COLUMN ";

Review Comment:
   **Nit:** `ALTER_COLUMN`、`IS`、`COLUMN_COMMENT`、`TABLE_COMMENT` 
这四个常量定义了但在代码里从未使用(代码里都是用 String.format 直接写的字面量)。
   
   建议要么在代码中统一使用这些常量(提高一致性),要么删除这些 dead code。



##########
catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java:
##########
@@ -34,17 +68,123 @@
  * <p>Hologres is PostgreSQL-compatible, so most table operations follow 
PostgreSQL conventions.
  * However, Hologres has specific features like table properties (orientation, 
distribution_key,
  * etc.) that are handled through the WITH clause in CREATE TABLE statements.
- *
- * <p>TODO: Full implementation will be added in a follow-up PR.
  */
 public class HologresTableOperations extends JdbcTableOperations
     implements RequireDatabaseOperation {
 
-  public static final String HOLO_QUOTE = "\"";
+  public static final String NEW_LINE = "\n";
+  public static final String ALTER_TABLE = "ALTER TABLE ";
+  public static final String ALTER_COLUMN = "ALTER COLUMN ";
+  public static final String IS = " IS '";
+  public static final String COLUMN_COMMENT = "COMMENT ON COLUMN ";
+  public static final String TABLE_COMMENT = "COMMENT ON TABLE ";
+
+  private static final String HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG =
+      "Hologres does not support nested column names.";
+
+  /** Properties that are handled separately or read-only, excluded from the 
WITH clause. */
+  private static final Set<String> EXCLUDED_TABLE_PROPERTIES =
+      ImmutableSet.of("distribution_key", "is_logical_partitioned_table", 
"primary_key");
+
+  /** Properties that are meaningful for users, filtering out internal system 
properties. */
+  private static final Set<String> USER_RELEVANT_PROPERTIES =
+      ImmutableSet.of(
+          "orientation",
+          "clustering_key",
+          "segment_key",
+          "bitmap_columns",
+          "dictionary_encoding_columns",
+          "time_to_live_in_seconds",
+          "table_group",
+          "storage_format",
+          "binlog.level",
+          "binlog.ttl",
+          "is_logical_partitioned_table",
+          "partition_expiration_time",
+          "partition_keep_hot_window",
+          "partition_require_filter",
+          "partition_generate_binlog_window");
+
+  private String database;
+  private HologresSchemaOperations schemaOperations;
+
+  @Override
+  protected String quoteIdentifier(String identifier) {
+    return "\"" + identifier + "\"";
+  }
+
+  @Override
+  public void initialize(
+      DataSource dataSource,
+      JdbcExceptionConverter exceptionMapper,
+      JdbcTypeConverter jdbcTypeConverter,
+      JdbcColumnDefaultValueConverter jdbcColumnDefaultValueConverter,
+      Map<String, String> conf) {
+    super.initialize(
+        dataSource, exceptionMapper, jdbcTypeConverter, 
jdbcColumnDefaultValueConverter, conf);
+    database = new JdbcConfig(conf).getJdbcDatabase();
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(database),
+        "The `jdbc-database` configuration item is mandatory in Hologres.");
+  }
 
   @Override
   public void setDatabaseOperation(DatabaseOperation databaseOperation) {
-    // Will be implemented in a follow-up PR.
+    this.schemaOperations = (HologresSchemaOperations) databaseOperation;
+  }
+
+  @Override
+  public List<String> listTables(String schemaName) throws 
NoSuchSchemaException {
+    try (Connection connection = getConnection(schemaName)) {
+      if (!schemaOperations.schemaExists(connection, schemaName)) {
+        throw new NoSuchSchemaException("No such schema: %s", schemaName);
+      }
+      final List<String> names = Lists.newArrayList();
+      try (ResultSet tables = getTables(connection)) {
+        while (tables.next()) {
+          if (Objects.equals(tables.getString("TABLE_SCHEM"), schemaName)) {
+            names.add(tables.getString("TABLE_NAME"));
+          }
+        }
+      }
+      LOG.info("Finished listing tables size {} for schema name {} ", 
names.size(), schemaName);
+      return names;
+    } catch (final SQLException se) {
+      throw this.exceptionMapper.toGravitinoException(se);
+    }
+  }
+
+  @Override
+  protected JdbcTable.Builder getTableBuilder(
+      ResultSet tablesResult, String databaseName, String tableName) throws 
SQLException {
+    boolean found = false;
+    JdbcTable.Builder builder = null;
+    while (tablesResult.next() && !found) {
+      String tableNameInResult = tablesResult.getString("TABLE_NAME");
+      String tableSchemaInResultLowerCase = 
tablesResult.getString("TABLE_SCHEM");

Review Comment:
   **Nit:** 变量名 `tableSchemaInResultLowerCase` 有误导性——这里并没有做 toLowerCase 
转换,只是直接取了 `TABLE_SCHEM` 的值。建议改为 `tableSchemaInResult`。



##########
catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java:
##########
@@ -56,20 +196,838 @@ protected String generateCreateTableSql(
       Transform[] partitioning,
       Distribution distribution,
       Index[] indexes) {
-    throw new UnsupportedOperationException(
-        "Hologres table creation will be implemented in a follow-up PR.");
+    boolean isLogicalPartition =
+        MapUtils.isNotEmpty(properties)
+            && 
"true".equalsIgnoreCase(properties.get("is_logical_partitioned_table"));
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append(String.format("CREATE TABLE %s (%s", 
quoteIdentifier(tableName), NEW_LINE));
+
+    // Add columns
+    for (int i = 0; i < columns.length; i++) {
+      JdbcColumn column = columns[i];
+      sqlBuilder.append(String.format("    %s", 
quoteIdentifier(column.name())));
+
+      appendColumnDefinition(column, sqlBuilder);
+      // Add a comma for the next column, unless it's the last one
+      if (i < columns.length - 1) {
+        sqlBuilder.append(String.format(",%s", NEW_LINE));
+      }
+    }
+    appendIndexesSql(indexes, sqlBuilder);
+    sqlBuilder.append(String.format("%s)", NEW_LINE));
+
+    // Append partitioning clause if specified
+    if (ArrayUtils.isNotEmpty(partitioning)) {
+      appendPartitioningSql(partitioning, isLogicalPartition, sqlBuilder);
+    }
+
+    // Build WITH clause combining distribution and Hologres-specific table 
properties
+    // Supported properties: orientation, distribution_key, clustering_key, 
event_time_column,
+    // bitmap_columns, dictionary_encoding_columns, time_to_live_in_seconds, 
table_group, etc.
+    List<String> withEntries = new ArrayList<>();
+
+    // Add distribution_key from Distribution parameter
+    if (!Distributions.NONE.equals(distribution)) {
+      validateDistribution(distribution);
+      String distributionColumns =
+          Arrays.stream(distribution.expressions())
+              .map(
+                  expression -> {
+                    Preconditions.checkArgument(
+                        expression instanceof NamedReference,
+                        "Hologres distribution expressions must be simple 
column references");
+                    String[] fieldNames = ((NamedReference) 
expression).fieldName();
+                    Preconditions.checkArgument(
+                        fieldNames != null && fieldNames.length == 1,
+                        "Hologres distribution expressions must reference a 
single column");
+                    return fieldNames[0];
+                  })
+              .collect(Collectors.joining(","));
+      withEntries.add(String.format("distribution_key = '%s'", 
distributionColumns));
+    }
+
+    // Add user-specified properties (filter out read-only / 
internally-handled properties)
+    if (MapUtils.isNotEmpty(properties)) {
+      properties.forEach(
+          (key, value) -> {
+            if (!EXCLUDED_TABLE_PROPERTIES.contains(key)) {
+              withEntries.add(String.format("%s = '%s'", key, value));

Review Comment:
   **Bug (High):** 这里 property key 和 value 都没有做转义/校验,直接拼进了 SQL 的 WITH 子句。
   
   1. 如果 `value` 包含单引号 `'`,生成的 SQL 会语法错误甚至产生注入。
   2. `key` 也是直接拼接,恶意 key 可以注入任意 SQL。
   
   建议:
   - 对 `value` 做 `value.replace("'", "''")`(和你在 COMMENT 里的处理保持一致)
   - 对 `key` 做白名单校验或至少做 identifier 合法性检查



##########
catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java:
##########
@@ -34,17 +68,123 @@
  * <p>Hologres is PostgreSQL-compatible, so most table operations follow 
PostgreSQL conventions.
  * However, Hologres has specific features like table properties (orientation, 
distribution_key,
  * etc.) that are handled through the WITH clause in CREATE TABLE statements.
- *
- * <p>TODO: Full implementation will be added in a follow-up PR.
  */
 public class HologresTableOperations extends JdbcTableOperations
     implements RequireDatabaseOperation {
 
-  public static final String HOLO_QUOTE = "\"";
+  public static final String NEW_LINE = "\n";
+  public static final String ALTER_TABLE = "ALTER TABLE ";
+  public static final String ALTER_COLUMN = "ALTER COLUMN ";
+  public static final String IS = " IS '";
+  public static final String COLUMN_COMMENT = "COMMENT ON COLUMN ";
+  public static final String TABLE_COMMENT = "COMMENT ON TABLE ";
+
+  private static final String HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG =
+      "Hologres does not support nested column names.";
+
+  /** Properties that are handled separately or read-only, excluded from the 
WITH clause. */
+  private static final Set<String> EXCLUDED_TABLE_PROPERTIES =
+      ImmutableSet.of("distribution_key", "is_logical_partitioned_table", 
"primary_key");
+
+  /** Properties that are meaningful for users, filtering out internal system 
properties. */
+  private static final Set<String> USER_RELEVANT_PROPERTIES =
+      ImmutableSet.of(
+          "orientation",
+          "clustering_key",
+          "segment_key",
+          "bitmap_columns",
+          "dictionary_encoding_columns",
+          "time_to_live_in_seconds",
+          "table_group",
+          "storage_format",
+          "binlog.level",
+          "binlog.ttl",
+          "is_logical_partitioned_table",
+          "partition_expiration_time",
+          "partition_keep_hot_window",
+          "partition_require_filter",
+          "partition_generate_binlog_window");
+
+  private String database;
+  private HologresSchemaOperations schemaOperations;
+
+  @Override
+  protected String quoteIdentifier(String identifier) {
+    return "\"" + identifier + "\"";
+  }
+
+  @Override
+  public void initialize(
+      DataSource dataSource,
+      JdbcExceptionConverter exceptionMapper,
+      JdbcTypeConverter jdbcTypeConverter,
+      JdbcColumnDefaultValueConverter jdbcColumnDefaultValueConverter,
+      Map<String, String> conf) {
+    super.initialize(
+        dataSource, exceptionMapper, jdbcTypeConverter, 
jdbcColumnDefaultValueConverter, conf);
+    database = new JdbcConfig(conf).getJdbcDatabase();
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(database),
+        "The `jdbc-database` configuration item is mandatory in Hologres.");
+  }
 
   @Override
   public void setDatabaseOperation(DatabaseOperation databaseOperation) {
-    // Will be implemented in a follow-up PR.
+    this.schemaOperations = (HologresSchemaOperations) databaseOperation;
+  }
+
+  @Override
+  public List<String> listTables(String schemaName) throws 
NoSuchSchemaException {
+    try (Connection connection = getConnection(schemaName)) {
+      if (!schemaOperations.schemaExists(connection, schemaName)) {
+        throw new NoSuchSchemaException("No such schema: %s", schemaName);
+      }
+      final List<String> names = Lists.newArrayList();
+      try (ResultSet tables = getTables(connection)) {
+        while (tables.next()) {
+          if (Objects.equals(tables.getString("TABLE_SCHEM"), schemaName)) {
+            names.add(tables.getString("TABLE_NAME"));
+          }
+        }
+      }
+      LOG.info("Finished listing tables size {} for schema name {} ", 
names.size(), schemaName);

Review Comment:
   **Nit:** `LOG.info` 在 listTables 这个频繁调用的方法里会比较 noisy,建议降为 
`LOG.debug`。同样的问题也出现在第 297 行(generateCreateTableSql)和第 498 
行(generateAlterTableSql)。



##########
catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java:
##########
@@ -56,20 +196,838 @@ protected String generateCreateTableSql(
       Transform[] partitioning,
       Distribution distribution,
       Index[] indexes) {
-    throw new UnsupportedOperationException(
-        "Hologres table creation will be implemented in a follow-up PR.");
+    boolean isLogicalPartition =
+        MapUtils.isNotEmpty(properties)
+            && 
"true".equalsIgnoreCase(properties.get("is_logical_partitioned_table"));
+    StringBuilder sqlBuilder = new StringBuilder();
+    sqlBuilder.append(String.format("CREATE TABLE %s (%s", 
quoteIdentifier(tableName), NEW_LINE));
+
+    // Add columns
+    for (int i = 0; i < columns.length; i++) {
+      JdbcColumn column = columns[i];
+      sqlBuilder.append(String.format("    %s", 
quoteIdentifier(column.name())));
+
+      appendColumnDefinition(column, sqlBuilder);
+      // Add a comma for the next column, unless it's the last one
+      if (i < columns.length - 1) {
+        sqlBuilder.append(String.format(",%s", NEW_LINE));
+      }
+    }
+    appendIndexesSql(indexes, sqlBuilder);
+    sqlBuilder.append(String.format("%s)", NEW_LINE));
+
+    // Append partitioning clause if specified
+    if (ArrayUtils.isNotEmpty(partitioning)) {
+      appendPartitioningSql(partitioning, isLogicalPartition, sqlBuilder);
+    }
+
+    // Build WITH clause combining distribution and Hologres-specific table 
properties
+    // Supported properties: orientation, distribution_key, clustering_key, 
event_time_column,
+    // bitmap_columns, dictionary_encoding_columns, time_to_live_in_seconds, 
table_group, etc.
+    List<String> withEntries = new ArrayList<>();
+
+    // Add distribution_key from Distribution parameter
+    if (!Distributions.NONE.equals(distribution)) {
+      validateDistribution(distribution);
+      String distributionColumns =
+          Arrays.stream(distribution.expressions())
+              .map(
+                  expression -> {
+                    Preconditions.checkArgument(
+                        expression instanceof NamedReference,
+                        "Hologres distribution expressions must be simple 
column references");
+                    String[] fieldNames = ((NamedReference) 
expression).fieldName();
+                    Preconditions.checkArgument(
+                        fieldNames != null && fieldNames.length == 1,
+                        "Hologres distribution expressions must reference a 
single column");
+                    return fieldNames[0];
+                  })
+              .collect(Collectors.joining(","));
+      withEntries.add(String.format("distribution_key = '%s'", 
distributionColumns));
+    }
+
+    // Add user-specified properties (filter out read-only / 
internally-handled properties)
+    if (MapUtils.isNotEmpty(properties)) {
+      properties.forEach(
+          (key, value) -> {
+            if (!EXCLUDED_TABLE_PROPERTIES.contains(key)) {
+              withEntries.add(String.format("%s = '%s'", key, value));
+            }
+          });
+    }
+
+    // Generate WITH clause
+    if (!withEntries.isEmpty()) {
+      sqlBuilder.append(String.format("%sWITH (%s", NEW_LINE, NEW_LINE));
+      sqlBuilder.append(
+          withEntries.stream()
+              .map(entry -> String.format("    %s", entry))
+              .collect(Collectors.joining(String.format(",%s", NEW_LINE))));
+      sqlBuilder.append(String.format("%s)", NEW_LINE));
+    }
+
+    sqlBuilder.append(";");
+
+    // Add table comment if specified
+    if (StringUtils.isNotEmpty(comment)) {
+      String escapedComment = comment.replace("'", "''");
+      sqlBuilder
+          .append(NEW_LINE)
+          .append(
+              String.format(
+                  "COMMENT ON TABLE %s IS '%s';", quoteIdentifier(tableName), 
escapedComment));
+    }
+    Arrays.stream(columns)
+        .filter(jdbcColumn -> StringUtils.isNotEmpty(jdbcColumn.comment()))
+        .forEach(
+            jdbcColumn -> {
+              String escapedColComment = jdbcColumn.comment().replace("'", 
"''");
+              sqlBuilder
+                  .append(NEW_LINE)
+                  .append(
+                      String.format(
+                          "COMMENT ON COLUMN %s.%s IS '%s';",
+                          quoteIdentifier(tableName),
+                          quoteIdentifier(jdbcColumn.name()),
+                          escapedColComment));
+            });
+    // Return the generated SQL statement
+    String result = sqlBuilder.toString();
+
+    LOG.info("Generated create table:{} sql: {}", tableName, result);
+    return result;
+  }
+
+  @VisibleForTesting
+  static void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) {
+    for (Index index : indexes) {
+      String fieldStr = getIndexFieldStr(index.fieldNames());
+      sqlBuilder.append(String.format(",%s", NEW_LINE));
+      switch (index.type()) {
+        case PRIMARY_KEY:
+          sqlBuilder.append(String.format("PRIMARY KEY (%s)", fieldStr));
+          break;
+        default:
+          throw new IllegalArgumentException(
+              "Hologres only supports PRIMARY_KEY index, but got: " + 
index.type());
+      }
+    }
+  }
+
+  protected static String getIndexFieldStr(String[][] fieldNames) {
+    return Arrays.stream(fieldNames)
+        .map(
+            colNames -> {
+              if (colNames.length > 1) {
+                throw new IllegalArgumentException(
+                    "Index does not support complex fields in Hologres");
+              }
+              return "\"" + colNames[0] + "\"";
+            })
+        .collect(Collectors.joining(", "));
+  }
+
+  /**
+   * Append the partitioning clause to the CREATE TABLE SQL.
+   *
+   * <p>Hologres supports two types of partition tables:
+   *
+   * <ul>
+   *   <li>Physical partition table: uses {@code PARTITION BY LIST(column)} 
syntax
+   *   <li>Logical partition table (V3.1+): uses {@code LOGICAL PARTITION BY 
LIST(column1[,
+   *       column2])} syntax
+   * </ul>
+   *
+   * @param partitioning the partition transforms (only LIST partitioning is 
supported)
+   * @param isLogicalPartition whether to create a logical partition table
+   * @param sqlBuilder the SQL builder to append to
+   */
+  @VisibleForTesting
+  static void appendPartitioningSql(
+      Transform[] partitioning, boolean isLogicalPartition, StringBuilder 
sqlBuilder) {
+    Preconditions.checkArgument(
+        partitioning.length == 1,
+        "Hologres only supports single partition transform, but got %s",
+        partitioning.length);
+    Preconditions.checkArgument(
+        partitioning[0] instanceof Transforms.ListTransform,
+        "Hologres only supports LIST partitioning, but got %s",
+        partitioning[0].getClass().getSimpleName());
+
+    Transforms.ListTransform listTransform = (Transforms.ListTransform) 
partitioning[0];
+    String[][] fieldNames = listTransform.fieldNames();
+
+    Preconditions.checkArgument(fieldNames.length > 0, "Partition columns must 
not be empty");
+
+    if (isLogicalPartition) {
+      Preconditions.checkArgument(
+          fieldNames.length <= 2,
+          "Logical partition table supports at most 2 partition columns, but 
got: %s",
+          fieldNames.length);
+    } else {
+      Preconditions.checkArgument(
+          fieldNames.length == 1,
+          "Physical partition table supports exactly 1 partition column, but 
got: %s",
+          fieldNames.length);
+    }
+
+    String partitionColumns =
+        Arrays.stream(fieldNames)
+            .map(
+                colNames -> {
+                  Preconditions.checkArgument(
+                      colNames.length == 1,
+                      "Hologres partition does not support nested field 
names");
+                  return "\"" + colNames[0] + "\"";
+                })
+            .collect(Collectors.joining(", "));
+
+    sqlBuilder.append(NEW_LINE);
+    if (isLogicalPartition) {
+      sqlBuilder.append(String.format("LOGICAL PARTITION BY LIST(%s)", 
partitionColumns));
+    } else {
+      sqlBuilder.append(String.format("PARTITION BY LIST(%s)", 
partitionColumns));
+    }
+  }
+
+  private void appendColumnDefinition(JdbcColumn column, StringBuilder 
sqlBuilder) {
+    // Add data type
+    
sqlBuilder.append(SPACE).append(typeConverter.fromGravitino(column.dataType())).append(SPACE);
+
+    // Hologres does not support auto-increment columns via Gravitino
+    if (column.autoIncrement()) {
+      throw new IllegalArgumentException(
+          "Hologres does not support creating auto-increment columns via 
Gravitino, column: "
+              + column.name());
+    }
+
+    // Add NULL / NOT NULL constraint
+    if (column.nullable()) {
+      sqlBuilder.append("NULL ");
+    } else {
+      sqlBuilder.append("NOT NULL ");
+    }
+    // Add DEFAULT value if specified
+    appendDefaultValue(column, sqlBuilder);
   }
 
   @Override
-  protected String generateAlterTableSql(
-      String schemaName, String tableName, TableChange... changes) {
-    throw new UnsupportedOperationException(
-        "Hologres table alteration will be implemented in a follow-up PR.");
+  protected String generateRenameTableSql(String oldTableName, String 
newTableName) {
+    return String.format(
+        "%s%s RENAME TO %s",
+        ALTER_TABLE, quoteIdentifier(oldTableName), 
quoteIdentifier(newTableName));
+  }
+
+  @Override
+  protected String generateDropTableSql(String tableName) {
+    return String.format("DROP TABLE %s", quoteIdentifier(tableName));
   }
 
   @Override
   protected String generatePurgeTableSql(String tableName) {
     throw new UnsupportedOperationException(
         "Hologres does not support purge table in Gravitino, please use drop 
table");
   }
+
+  @Override
+  protected String generateAlterTableSql(
+      String schemaName, String tableName, TableChange... changes) {
+    // Not all operations require the original table information, so lazy 
loading is used here
+    JdbcTable lazyLoadTable = null;
+    List<String> alterSql = new ArrayList<>();
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.UpdateComment) {
+        lazyLoadTable = getOrCreateTable(schemaName, tableName, lazyLoadTable);
+        alterSql.add(updateCommentDefinition((TableChange.UpdateComment) 
change, lazyLoadTable));
+      } else if (change instanceof TableChange.SetProperty) {
+        throw new IllegalArgumentException("Set property is not supported 
yet");
+      } else if (change instanceof TableChange.RemoveProperty) {
+        throw new IllegalArgumentException("Remove property is not supported 
yet");
+      } else if (change instanceof TableChange.AddColumn) {
+        TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
+        lazyLoadTable = getOrCreateTable(schemaName, tableName, lazyLoadTable);
+        alterSql.addAll(addColumnFieldDefinition(addColumn, lazyLoadTable));
+      } else if (change instanceof TableChange.RenameColumn) {
+        TableChange.RenameColumn renameColumn = (TableChange.RenameColumn) 
change;
+        alterSql.add(renameColumnFieldDefinition(renameColumn, tableName));
+      } else if (change instanceof TableChange.UpdateColumnDefaultValue) {
+        throw new IllegalArgumentException(
+            "Hologres does not support altering column default value via ALTER 
TABLE.");
+      } else if (change instanceof TableChange.UpdateColumnType) {
+        throw new IllegalArgumentException(
+            "Hologres does not support altering column type via ALTER TABLE.");
+      } else if (change instanceof TableChange.UpdateColumnComment) {
+        alterSql.add(
+            updateColumnCommentFieldDefinition(
+                (TableChange.UpdateColumnComment) change, tableName));
+      } else if (change instanceof TableChange.UpdateColumnPosition) {
+        throw new IllegalArgumentException("Hologres does not support column 
position.");
+      } else if (change instanceof TableChange.DeleteColumn) {
+        lazyLoadTable = getOrCreateTable(schemaName, tableName, lazyLoadTable);
+        TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) 
change;
+        String deleteColSql = deleteColumnFieldDefinition(deleteColumn, 
lazyLoadTable);
+        if (StringUtils.isNotEmpty(deleteColSql)) {
+          alterSql.add(deleteColSql);
+        }
+      } else if (change instanceof TableChange.UpdateColumnNullability) {
+        throw new IllegalArgumentException(
+            "Hologres does not support altering column nullability via ALTER 
TABLE.");
+      } else if (change instanceof TableChange.AddIndex) {
+        throw new IllegalArgumentException(
+            "Hologres does not support adding index via ALTER TABLE.");
+      } else if (change instanceof TableChange.DeleteIndex) {
+        throw new IllegalArgumentException(
+            "Hologres does not support deleting index via ALTER TABLE.");
+      } else if (change instanceof TableChange.UpdateColumnAutoIncrement) {
+        throw new IllegalArgumentException(
+            "Hologres does not support altering column auto-increment via 
ALTER TABLE.");
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported table change type: " + change.getClass().getName());
+      }
+    }
+
+    // Filter out empty strings and check if there are any actual changes
+    alterSql.removeIf(String::isEmpty);
+    if (alterSql.isEmpty()) {
+      return "";
+    }
+
+    // Return the generated SQL statement
+    String result = String.join("\n", alterSql);
+    LOG.info("Generated alter table:{}.{} sql: {}", schemaName, tableName, 
result);
+    return result;
+  }
+
+  private String updateCommentDefinition(
+      TableChange.UpdateComment updateComment, JdbcTable jdbcTable) {
+    String newComment = updateComment.getNewComment();
+    if (null == StringIdentifier.fromComment(newComment)) {
+      // Detect and add Gravitino id.
+      if (StringUtils.isNotEmpty(jdbcTable.comment())) {
+        StringIdentifier identifier = 
StringIdentifier.fromComment(jdbcTable.comment());
+        if (null != identifier) {
+          newComment = StringIdentifier.addToComment(identifier, newComment);
+        }
+      }
+    }
+    return String.format(
+        "COMMENT ON TABLE %s IS '%s';",
+        quoteIdentifier(jdbcTable.name()), newComment.replace("'", "''"));
+  }
+
+  private String deleteColumnFieldDefinition(
+      TableChange.DeleteColumn deleteColumn, JdbcTable table) {
+    if (deleteColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+    String col = deleteColumn.fieldName()[0];
+    boolean colExists =
+        Arrays.stream(table.columns()).anyMatch(s -> StringUtils.equals(col, 
s.name()));
+    if (!colExists) {
+      if (BooleanUtils.isTrue(deleteColumn.getIfExists())) {
+        return "";
+      } else {
+        throw new IllegalArgumentException("Delete column does not exist: " + 
col);
+      }
+    }
+    return String.format(
+        "%s%s DROP COLUMN %s;",
+        ALTER_TABLE, quoteIdentifier(table.name()), 
quoteIdentifier(deleteColumn.fieldName()[0]));
+  }
+
+  private String renameColumnFieldDefinition(
+      TableChange.RenameColumn renameColumn, String tableName) {
+    if (renameColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+    return String.format(
+        "%s%s RENAME COLUMN %s TO %s;",
+        ALTER_TABLE,
+        quoteIdentifier(tableName),
+        quoteIdentifier(renameColumn.fieldName()[0]),
+        quoteIdentifier(renameColumn.getNewName()));
+  }
+
+  private List<String> addColumnFieldDefinition(
+      TableChange.AddColumn addColumn, JdbcTable lazyLoadTable) {
+    if (addColumn.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+
+    // Hologres does not support setting nullable, default value, or 
auto-increment via ADD COLUMN
+    if (!addColumn.isNullable()) {
+      throw new IllegalArgumentException(
+          "Hologres does not support setting NOT NULL constraint when adding a 
column via ALTER TABLE.");
+    }
+    if (!Column.DEFAULT_VALUE_NOT_SET.equals(addColumn.getDefaultValue())) {
+      throw new IllegalArgumentException(
+          "Hologres does not support setting default value when adding a 
column via ALTER TABLE.");
+    }
+    if (addColumn.isAutoIncrement()) {
+      throw new IllegalArgumentException(
+          "Hologres does not support setting auto-increment when adding a 
column via ALTER TABLE.");
+    }
+
+    List<String> result = new ArrayList<>();
+    String col = addColumn.fieldName()[0];
+
+    String columnDefinition =
+        String.format(
+            "%s%s ADD COLUMN %s %s",
+            ALTER_TABLE,
+            quoteIdentifier(lazyLoadTable.name()),
+            quoteIdentifier(col),
+            typeConverter.fromGravitino(addColumn.getDataType()));
+
+    // Append position if available
+    if (!(addColumn.getPosition() instanceof TableChange.Default)) {
+      throw new IllegalArgumentException("Hologres does not support column 
position in Gravitino.");
+    }
+    result.add(columnDefinition + ";");
+
+    // Append comment if available
+    if (StringUtils.isNotEmpty(addColumn.getComment())) {
+      String escapedComment = addColumn.getComment().replace("'", "''");
+      result.add(
+          String.format(
+              "COMMENT ON COLUMN %s.%s IS '%s';",
+              quoteIdentifier(lazyLoadTable.name()), quoteIdentifier(col), 
escapedComment));
+    }
+    return result;
+  }
+
+  private String updateColumnCommentFieldDefinition(
+      TableChange.UpdateColumnComment updateColumnComment, String tableName) {
+    String newComment = updateColumnComment.getNewComment();
+    if (updateColumnComment.fieldName().length > 1) {
+      throw new 
UnsupportedOperationException(HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG);
+    }
+    String col = updateColumnComment.fieldName()[0];
+    return String.format(
+        "COMMENT ON COLUMN %s.%s IS '%s';",
+        quoteIdentifier(tableName), quoteIdentifier(col), 
newComment.replace("'", "''"));
+  }
+
+  @Override
+  protected ResultSet getIndexInfo(String schemaName, String tableName, 
DatabaseMetaData metaData)
+      throws SQLException {
+    return metaData.getIndexInfo(database, schemaName, tableName, false, 
false);
+  }
+
+  @Override
+  protected ResultSet getPrimaryKeys(String schemaName, String tableName, 
DatabaseMetaData metaData)
+      throws SQLException {
+    return metaData.getPrimaryKeys(database, schemaName, tableName);
+  }
+
+  @Override
+  protected Connection getConnection(String schema) throws SQLException {
+    Connection connection = dataSource.getConnection();
+    connection.setCatalog(database);
+    connection.setSchema(schema);
+    return connection;
+  }
+
+  /**
+   * Get tables from the database including regular tables and partitioned 
parent tables.
+   *
+   * <p>In Hologres (PostgreSQL-compatible):
+   *
+   * <ul>
+   *   <li>Regular tables and partition child tables have TABLE_TYPE = "TABLE"
+   *   <li>Partitioned parent tables have TABLE_TYPE = "PARTITIONED TABLE"
+   *   <li>Views have TABLE_TYPE = "VIEW" (excluded from listing)
+   *   <li>Foreign tables have TABLE_TYPE = "FOREIGN TABLE" (excluded from 
listing)
+   * </ul>
+   *
+   * <p>This method overrides the parent to include regular tables and 
partition parent tables, but
+   * excludes views and foreign tables from the table list.
+   *
+   * @param connection the database connection
+   * @return ResultSet containing table metadata
+   * @throws SQLException if a database access error occurs
+   */
+  @Override
+  protected ResultSet getTables(Connection connection) throws SQLException {
+    DatabaseMetaData metaData = connection.getMetaData();
+    String catalogName = connection.getCatalog();
+    String schemaName = connection.getSchema();
+    // Include "TABLE" (regular tables and partition children),
+    // and "PARTITIONED TABLE" (partition parent tables)
+    // Exclude "VIEW" and "FOREIGN TABLE" to hide views and foreign tables 
from Gravitino
+    return metaData.getTables(
+        catalogName, schemaName, null, new String[] {"TABLE", "PARTITIONED 
TABLE"});
+  }
+
+  @Override
+  protected ResultSet getTable(Connection connection, String schema, String 
tableName)
+      throws SQLException {
+    DatabaseMetaData metaData = connection.getMetaData();
+    // Include TABLE and PARTITIONED TABLE types
+    // Exclude VIEW and FOREIGN TABLE to hide views and foreign tables from 
Gravitino
+    return metaData.getTables(
+        database, schema, tableName, new String[] {"TABLE", "PARTITIONED 
TABLE"});
+  }
+
+  @Override
+  protected ResultSet getColumns(Connection connection, String schema, String 
tableName)
+      throws SQLException {
+    DatabaseMetaData metaData = connection.getMetaData();
+    return metaData.getColumns(database, schema, tableName, null);
+  }
+
+  /**
+   * Get distribution information from Hologres system table 
hologres.hg_table_properties.
+   *
+   * <p>In Hologres, distribution_key is stored as a table property with the 
property_key
+   * "distribution_key" and property_value as comma-separated column names 
(e.g., "col1,col2").
+   *
+   * <p>This method queries the system table and returns a HASH distribution 
with the specified
+   * columns. Hologres only supports HASH distribution strategy.
+   *
+   * @param connection the database connection
+   * @param databaseName the schema name
+   * @param tableName the table name
+   * @return the distribution info, or {@link Distributions#NONE} if no 
distribution_key is set
+   * @throws SQLException if a database access error occurs
+   */
+  @Override
+  protected Distribution getDistributionInfo(
+      Connection connection, String databaseName, String tableName) throws 
SQLException {
+    String schemaName = connection.getSchema();
+    String distributionSql =
+        "SELECT property_value "
+            + "FROM hologres.hg_table_properties "
+            + "WHERE table_namespace = ? AND table_name = ? AND property_key = 
'distribution_key'";
+
+    try (PreparedStatement statement = 
connection.prepareStatement(distributionSql)) {
+      statement.setString(1, schemaName);
+      statement.setString(2, tableName);
+
+      try (ResultSet resultSet = statement.executeQuery()) {
+        if (resultSet.next()) {
+          String distributionKey = resultSet.getString("property_value");
+          if (StringUtils.isNotEmpty(distributionKey)) {
+            NamedReference[] columns =
+                Arrays.stream(distributionKey.split(","))
+                    .map(String::trim)
+                    .filter(StringUtils::isNotEmpty)
+                    .map(NamedReference::field)
+                    .toArray(NamedReference[]::new);
+            if (columns.length > 0) {
+              return Distributions.hash(0, columns);
+            }
+          }
+        }
+      }
+    }
+    return Distributions.NONE;
+  }
+
+  /**
+   * Validate the distribution for Hologres.
+   *
+   * <p>Hologres only supports HASH distribution strategy.
+   *
+   * @param distribution the distribution to validate
+   */
+  private void validateDistribution(Distribution distribution) {
+    Preconditions.checkArgument(
+        distribution.strategy() == Strategy.HASH,
+        "Hologres only supports HASH distribution strategy, but got: %s",
+        distribution.strategy());
+    Preconditions.checkArgument(
+        distribution.expressions().length > 0,
+        "Hologres HASH distribution requires at least one distribution 
column");
+  }
+
+  @Override
+  public Integer calculateDatetimePrecision(String typeName, int columnSize, 
int scale) {
+    String upperTypeName = typeName.toUpperCase();
+    switch (upperTypeName) {
+      case "TIME":
+      case "TIMETZ":
+      case "TIMESTAMP":
+      case "TIMESTAMPTZ":
+        return Math.max(scale, 0);
+      default:
+        return null;
+    }
+  }
+
+  /**
+   * Get table partitioning information from PostgreSQL system tables.
+   *
+   * <p>Hologres (PostgreSQL-compatible) only supports LIST partitioning. This 
method queries
+   * pg_partitioned_table and pg_attribute system tables to determine if a 
table is partitioned, and
+   * if so, returns the partition column names as a LIST transform.
+   *
+   * <p>The SQL queries follow the same approach used by Holo Client's 
ConnectionUtil:
+   *
+   * <ol>
+   *   <li>Query pg_partitioned_table to check if the table is partitioned and 
get partition column
+   *       attribute numbers (partattrs)
+   *   <li>Query pg_attribute to resolve attribute numbers to column names
+   * </ol>
+   *
+   * @param connection the database connection
+   * @param databaseName the schema name
+   * @param tableName the table name
+   * @return the partition transforms, or empty if the table is not partitioned
+   * @throws SQLException if a database access error occurs
+   */
+  @Override
+  protected Transform[] getTablePartitioning(
+      Connection connection, String databaseName, String tableName) throws 
SQLException {
+
+    // First, check if this is a logical partitioned table by querying table 
properties.
+    // Logical partition tables in Hologres (V3.1+) have the property
+    // "is_logical_partitioned_table" set to "true", and partition columns are 
stored in
+    // the "logical_partition_columns" property.
+    Transform[] logicalPartitioning = getLogicalPartitioning(connection, 
tableName);
+    if (logicalPartitioning.length > 0) {
+      return logicalPartitioning;
+    }
+
+    // Fall back to physical partition table check via pg_partitioned_table 
system table
+    return getPhysicalPartitioning(connection, databaseName, tableName);
+  }
+
+  /**
+   * Get logical partition information from Hologres table properties.
+   *
+   * <p>Hologres V3.1+ supports logical partition tables where the parent 
table is a physical table
+   * and child partitions are logical concepts. A logical partition table is 
identified by the
+   * property "is_logical_partitioned_table" = "true" in 
hologres.hg_table_properties, and its
+   * partition columns are stored in the "logical_partition_columns" property.
+   *
+   * @param connection the database connection
+   * @param tableName the table name
+   * @return the partition transforms, or empty if the table is not a logical 
partition table
+   * @throws SQLException if a database access error occurs
+   */
+  private Transform[] getLogicalPartitioning(Connection connection, String 
tableName)
+      throws SQLException {
+    String schemaName = connection.getSchema();
+    String logicalPartitionSql =
+        "SELECT property_key, property_value "
+            + "FROM hologres.hg_table_properties "
+            + "WHERE table_namespace = ? AND table_name = ? "
+            + "AND property_key IN ('is_logical_partitioned_table', 
'logical_partition_columns')";
+
+    String isLogicalPartitioned = null;
+    String logicalPartitionColumns = null;
+
+    try (PreparedStatement statement = 
connection.prepareStatement(logicalPartitionSql)) {
+      statement.setString(1, schemaName);
+      statement.setString(2, tableName);
+
+      try (ResultSet resultSet = statement.executeQuery()) {
+        while (resultSet.next()) {
+          String key = resultSet.getString("property_key");
+          String value = resultSet.getString("property_value");
+          if ("is_logical_partitioned_table".equals(key)) {
+            isLogicalPartitioned = value;
+          } else if ("logical_partition_columns".equals(key)) {
+            logicalPartitionColumns = value;
+          }
+        }
+      }
+    }
+
+    if (!"true".equalsIgnoreCase(isLogicalPartitioned)
+        || StringUtils.isEmpty(logicalPartitionColumns)) {
+      return Transforms.EMPTY_TRANSFORM;
+    }
+
+    // Parse partition column names (comma-separated, e.g., "col1" or 
"col1,col2")
+    String[][] fieldNames =
+        Arrays.stream(logicalPartitionColumns.split(","))
+            .map(String::trim)
+            .filter(StringUtils::isNotEmpty)
+            .map(col -> new String[] {col})
+            .toArray(String[][]::new);
+
+    if (fieldNames.length == 0) {
+      return Transforms.EMPTY_TRANSFORM;
+    }
+
+    return new Transform[] {Transforms.list(fieldNames)};
+  }
+
+  /**
+   * Get physical partition information from PostgreSQL system tables.
+   *
+   * <p>Hologres (PostgreSQL-compatible) only supports LIST partitioning for 
physical partitions.
+   * This method queries pg_partitioned_table and pg_attribute system tables 
to determine if a table
+   * is partitioned, and if so, returns the partition column names as a LIST 
transform.
+   *
+   * @param connection the database connection
+   * @param databaseName the schema name
+   * @param tableName the table name
+   * @return the partition transforms, or empty if the table is not a physical 
partition table
+   * @throws SQLException if a database access error occurs
+   */
+  private Transform[] getPhysicalPartitioning(
+      Connection connection, String databaseName, String tableName) throws 
SQLException {
+
+    // Query pg_partitioned_table to get partition strategy and column 
attribute numbers
+    String partitionSql =
+        "SELECT part.partstrat, part.partnatts, part.partattrs "
+            + "FROM pg_catalog.pg_class c "
+            + "JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace "
+            + "JOIN pg_catalog.pg_partitioned_table part ON c.oid = 
part.partrelid "
+            + "WHERE n.nspname = ? AND c.relname = ? "
+            + "LIMIT 1";
+
+    String partStrategy = null;
+    String partAttrs = null;
+
+    try (PreparedStatement statement = 
connection.prepareStatement(partitionSql)) {
+      statement.setString(1, databaseName);
+      statement.setString(2, tableName);
+
+      try (ResultSet resultSet = statement.executeQuery()) {
+        if (resultSet.next()) {
+          partStrategy = resultSet.getString("partstrat");
+          partAttrs = resultSet.getString("partattrs");
+        }
+      }
+    }
+
+    // Not a partitioned table
+    if (partStrategy == null || partAttrs == null) {
+      return Transforms.EMPTY_TRANSFORM;
+    }
+
+    // Parse partition attribute numbers (e.g., "1" or "1 2")
+    String[] attrNums = partAttrs.trim().split("\\s+");
+    List<String[]> partitionColumnNames = new ArrayList<>();
+
+    // Resolve attribute numbers to column names
+    String attrSql =
+        "SELECT attname FROM pg_catalog.pg_attribute "
+            + "WHERE attrelid = (SELECT c.oid FROM pg_catalog.pg_class c "
+            + "  JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace "
+            + "  WHERE n.nspname = ? AND c.relname = ?) "
+            + "AND attnum = ?";
+
+    for (String attrNum : attrNums) {

Review Comment:
   **Performance:** `getPhysicalPartitioning` 中对每个 partition attribute 
都创建了一个独立的 PreparedStatement 去查 `pg_attribute`。如果有多个 partition column,这就是 N+1 查询。
   
   建议合并成一个 `WHERE attnum IN (?, ?, ...)` 的批量查询,或者直接在原始 SQL 中 JOIN 
pg_attribute,避免循环内的多次 DB round-trip。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to