yuqi1129 commented on code in PR #9963: URL: https://github.com/apache/gravitino/pull/9963#discussion_r2852905139
########## docs/manage-relational-metadata-using-gravitino.md: ########## @@ -122,6 +123,7 @@ Currently, Gravitino supports the following catalog providers: | `jdbc-postgresql` | [PostgreSQL catalog property](./jdbc-postgresql-catalog.md#catalog-properties) | | `jdbc-doris` | [Doris catalog property](./jdbc-doris-catalog.md#catalog-properties) | | `jdbc-oceanbase` | [OceanBase catalog property](./jdbc-oceanbase-catalog.md#catalog-properties) | +| `jdbc-hologres` | [Hologres catalog property](./jdbc-hologres-catalog.md#catalog-properties) | Review Comment: Index.md should also be updated. ########## catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java: ########## @@ -0,0 +1,1088 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.hologres.operation; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import javax.sql.DataSource; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.StringIdentifier; +import org.apache.gravitino.catalog.jdbc.JdbcColumn; +import org.apache.gravitino.catalog.jdbc.JdbcTable; +import org.apache.gravitino.catalog.jdbc.config.JdbcConfig; +import org.apache.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter; +import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter; +import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter; +import org.apache.gravitino.catalog.jdbc.operation.DatabaseOperation; +import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations; +import org.apache.gravitino.catalog.jdbc.operation.RequireDatabaseOperation; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.exceptions.NoSuchTableException; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.TableChange; +import org.apache.gravitino.rel.expressions.NamedReference; +import org.apache.gravitino.rel.expressions.UnparsedExpression; +import org.apache.gravitino.rel.expressions.distributions.Distribution; +import org.apache.gravitino.rel.expressions.distributions.Distributions; +import org.apache.gravitino.rel.expressions.distributions.Strategy; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.expressions.transforms.Transforms; +import org.apache.gravitino.rel.indexes.Index; + +/** + * Table operations for Hologres. + * + * <p>Hologres is PostgreSQL-compatible, so most table operations follow PostgreSQL conventions. + * However, Hologres has specific features like table properties (orientation, distribution_key, + * etc.) that are handled through the WITH clause in CREATE TABLE statements. + */ +public class HologresTableOperations extends JdbcTableOperations + implements RequireDatabaseOperation { + + public static final String HOLO_QUOTE = "\""; + public static final String NEW_LINE = "\n"; + public static final String ALTER_TABLE = "ALTER TABLE "; + public static final String ALTER_COLUMN = "ALTER COLUMN "; + public static final String IS = " IS '"; + public static final String COLUMN_COMMENT = "COMMENT ON COLUMN "; + public static final String TABLE_COMMENT = "COMMENT ON TABLE "; + + private static final String HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG = + "Hologres does not support nested column names."; + + /** Properties that are handled separately or read-only, excluded from the WITH clause. */ + private static final Set<String> EXCLUDED_TABLE_PROPERTIES = + ImmutableSet.of("distribution_key", "is_logical_partitioned_table", "primary_key"); + + private String database; + private HologresSchemaOperations schemaOperations; + + @Override + public void initialize( + DataSource dataSource, + JdbcExceptionConverter exceptionMapper, + JdbcTypeConverter jdbcTypeConverter, + JdbcColumnDefaultValueConverter jdbcColumnDefaultValueConverter, + Map<String, String> conf) { + super.initialize( + dataSource, exceptionMapper, jdbcTypeConverter, jdbcColumnDefaultValueConverter, conf); + database = new JdbcConfig(conf).getJdbcDatabase(); + Preconditions.checkArgument( + StringUtils.isNotBlank(database), + "The `jdbc-database` configuration item is mandatory in Hologres."); + } + + @Override + public void setDatabaseOperation(DatabaseOperation databaseOperation) { + this.schemaOperations = (HologresSchemaOperations) databaseOperation; + } + + @Override + public List<String> listTables(String schemaName) throws NoSuchSchemaException { + try (Connection connection = getConnection(schemaName)) { + if (!schemaOperations.schemaExists(connection, schemaName)) { + throw new NoSuchSchemaException("No such schema: %s", schemaName); + } + final List<String> names = Lists.newArrayList(); + try (ResultSet tables = getTables(connection)) { + while (tables.next()) { + if (Objects.equals(tables.getString("TABLE_SCHEM"), schemaName)) { + names.add(tables.getString("TABLE_NAME")); + } + } + } + LOG.info("Finished listing tables size {} for schema name {} ", names.size(), schemaName); + return names; + } catch (final SQLException se) { + throw this.exceptionMapper.toGravitinoException(se); + } + } + + @Override + protected JdbcTable.Builder getTableBuilder( + ResultSet tablesResult, String databaseName, String tableName) throws SQLException { + boolean found = false; + JdbcTable.Builder builder = null; + while (tablesResult.next() && !found) { + String tableNameInResult = tablesResult.getString("TABLE_NAME"); + String tableSchemaInResultLowerCase = tablesResult.getString("TABLE_SCHEM"); + if (Objects.equals(tableNameInResult, tableName) + && Objects.equals(tableSchemaInResultLowerCase, databaseName)) { + builder = getBasicJdbcTableInfo(tablesResult); + found = true; + } + } + + if (!found) { + throw new NoSuchTableException("Table %s does not exist in %s.", tableName, databaseName); + } + + return builder; + } + + @Override + protected JdbcColumn.Builder getColumnBuilder( + ResultSet columnsResult, String databaseName, String tableName) throws SQLException { + JdbcColumn.Builder builder = null; + if (Objects.equals(columnsResult.getString("TABLE_NAME"), tableName) + && Objects.equals(columnsResult.getString("TABLE_SCHEM"), databaseName)) { + builder = getBasicJdbcColumnInfo(columnsResult); + } + return builder; + } + + @Override + protected String generateCreateTableSql( + String tableName, + JdbcColumn[] columns, + String comment, + Map<String, String> properties, + Transform[] partitioning, + Distribution distribution, + Index[] indexes) { + boolean isLogicalPartition = + MapUtils.isNotEmpty(properties) + && "true".equalsIgnoreCase(properties.get("is_logical_partitioned_table")); + StringBuilder sqlBuilder = new StringBuilder(); + sqlBuilder Review Comment: Please change all code like this to "xxx %s".format(xxx) in this file to make it more readable. ########## catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java: ########## @@ -0,0 +1,1088 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.hologres.operation; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import javax.sql.DataSource; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.StringIdentifier; +import org.apache.gravitino.catalog.jdbc.JdbcColumn; +import org.apache.gravitino.catalog.jdbc.JdbcTable; +import org.apache.gravitino.catalog.jdbc.config.JdbcConfig; +import org.apache.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter; +import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter; +import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter; +import org.apache.gravitino.catalog.jdbc.operation.DatabaseOperation; +import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations; +import org.apache.gravitino.catalog.jdbc.operation.RequireDatabaseOperation; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.exceptions.NoSuchTableException; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.TableChange; +import org.apache.gravitino.rel.expressions.NamedReference; +import org.apache.gravitino.rel.expressions.UnparsedExpression; +import org.apache.gravitino.rel.expressions.distributions.Distribution; +import org.apache.gravitino.rel.expressions.distributions.Distributions; +import org.apache.gravitino.rel.expressions.distributions.Strategy; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.expressions.transforms.Transforms; +import org.apache.gravitino.rel.indexes.Index; + +/** + * Table operations for Hologres. + * + * <p>Hologres is PostgreSQL-compatible, so most table operations follow PostgreSQL conventions. + * However, Hologres has specific features like table properties (orientation, distribution_key, + * etc.) that are handled through the WITH clause in CREATE TABLE statements. + */ +public class HologresTableOperations extends JdbcTableOperations + implements RequireDatabaseOperation { + + public static final String HOLO_QUOTE = "\""; + public static final String NEW_LINE = "\n"; + public static final String ALTER_TABLE = "ALTER TABLE "; + public static final String ALTER_COLUMN = "ALTER COLUMN "; + public static final String IS = " IS '"; + public static final String COLUMN_COMMENT = "COMMENT ON COLUMN "; + public static final String TABLE_COMMENT = "COMMENT ON TABLE "; + + private static final String HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG = + "Hologres does not support nested column names."; + + /** Properties that are handled separately or read-only, excluded from the WITH clause. */ + private static final Set<String> EXCLUDED_TABLE_PROPERTIES = + ImmutableSet.of("distribution_key", "is_logical_partitioned_table", "primary_key"); + + private String database; + private HologresSchemaOperations schemaOperations; + + @Override + public void initialize( + DataSource dataSource, + JdbcExceptionConverter exceptionMapper, + JdbcTypeConverter jdbcTypeConverter, + JdbcColumnDefaultValueConverter jdbcColumnDefaultValueConverter, + Map<String, String> conf) { + super.initialize( + dataSource, exceptionMapper, jdbcTypeConverter, jdbcColumnDefaultValueConverter, conf); + database = new JdbcConfig(conf).getJdbcDatabase(); + Preconditions.checkArgument( + StringUtils.isNotBlank(database), + "The `jdbc-database` configuration item is mandatory in Hologres."); + } + + @Override + public void setDatabaseOperation(DatabaseOperation databaseOperation) { + this.schemaOperations = (HologresSchemaOperations) databaseOperation; + } + + @Override + public List<String> listTables(String schemaName) throws NoSuchSchemaException { + try (Connection connection = getConnection(schemaName)) { + if (!schemaOperations.schemaExists(connection, schemaName)) { + throw new NoSuchSchemaException("No such schema: %s", schemaName); + } + final List<String> names = Lists.newArrayList(); + try (ResultSet tables = getTables(connection)) { + while (tables.next()) { + if (Objects.equals(tables.getString("TABLE_SCHEM"), schemaName)) { + names.add(tables.getString("TABLE_NAME")); + } + } + } + LOG.info("Finished listing tables size {} for schema name {} ", names.size(), schemaName); + return names; + } catch (final SQLException se) { + throw this.exceptionMapper.toGravitinoException(se); + } + } + + @Override + protected JdbcTable.Builder getTableBuilder( + ResultSet tablesResult, String databaseName, String tableName) throws SQLException { + boolean found = false; + JdbcTable.Builder builder = null; + while (tablesResult.next() && !found) { + String tableNameInResult = tablesResult.getString("TABLE_NAME"); + String tableSchemaInResultLowerCase = tablesResult.getString("TABLE_SCHEM"); + if (Objects.equals(tableNameInResult, tableName) + && Objects.equals(tableSchemaInResultLowerCase, databaseName)) { + builder = getBasicJdbcTableInfo(tablesResult); + found = true; + } + } + + if (!found) { + throw new NoSuchTableException("Table %s does not exist in %s.", tableName, databaseName); + } + + return builder; + } + + @Override + protected JdbcColumn.Builder getColumnBuilder( + ResultSet columnsResult, String databaseName, String tableName) throws SQLException { + JdbcColumn.Builder builder = null; + if (Objects.equals(columnsResult.getString("TABLE_NAME"), tableName) + && Objects.equals(columnsResult.getString("TABLE_SCHEM"), databaseName)) { + builder = getBasicJdbcColumnInfo(columnsResult); + } + return builder; + } + + @Override + protected String generateCreateTableSql( + String tableName, + JdbcColumn[] columns, + String comment, + Map<String, String> properties, + Transform[] partitioning, + Distribution distribution, + Index[] indexes) { + boolean isLogicalPartition = + MapUtils.isNotEmpty(properties) + && "true".equalsIgnoreCase(properties.get("is_logical_partitioned_table")); + StringBuilder sqlBuilder = new StringBuilder(); + sqlBuilder + .append("CREATE TABLE ") + .append(HOLO_QUOTE) + .append(tableName) + .append(HOLO_QUOTE) + .append(" (") + .append(NEW_LINE); + + // Add columns + for (int i = 0; i < columns.length; i++) { + JdbcColumn column = columns[i]; + sqlBuilder.append(" ").append(HOLO_QUOTE).append(column.name()).append(HOLO_QUOTE); + + appendColumnDefinition(column, sqlBuilder); + // Add a comma for the next column, unless it's the last one + if (i < columns.length - 1) { + sqlBuilder.append(",").append(NEW_LINE); + } + } + appendIndexesSql(indexes, sqlBuilder); + sqlBuilder.append(NEW_LINE).append(")"); + + // Append partitioning clause if specified + if (ArrayUtils.isNotEmpty(partitioning)) { + appendPartitioningSql(partitioning, isLogicalPartition, sqlBuilder); + } + + // Build WITH clause combining distribution and Hologres-specific table properties + // Supported properties: orientation, distribution_key, clustering_key, event_time_column, + // bitmap_columns, dictionary_encoding_columns, time_to_live_in_seconds, table_group, etc. + List<String> withEntries = new ArrayList<>(); + + // Add distribution_key from Distribution parameter + if (!Distributions.NONE.equals(distribution)) { + validateDistribution(distribution); + String distributionColumns = + Arrays.stream(distribution.expressions()) + .map(Object::toString) + .collect(Collectors.joining(",")); + withEntries.add("distribution_key = '" + distributionColumns + "'"); + } + + // Add user-specified properties (filter out read-only / internally-handled properties) + if (MapUtils.isNotEmpty(properties)) { + properties.forEach( + (key, value) -> { + if (!EXCLUDED_TABLE_PROPERTIES.contains(key)) { + withEntries.add(key + " = '" + value + "'"); + } + }); + } + + // Generate WITH clause + if (!withEntries.isEmpty()) { + sqlBuilder.append(NEW_LINE).append("WITH (").append(NEW_LINE); + sqlBuilder.append( + withEntries.stream() + .map(entry -> " " + entry) + .collect(Collectors.joining("," + NEW_LINE))); + sqlBuilder.append(NEW_LINE).append(")"); + } + + sqlBuilder.append(";"); + + // Add table comment if specified + if (StringUtils.isNotEmpty(comment)) { + sqlBuilder + .append(NEW_LINE) + .append( + String.format( + "%s%s%s%s%s%s';", + TABLE_COMMENT, + HOLO_QUOTE, + tableName, + HOLO_QUOTE, + IS, + comment.replace("'", "''"))); + } + Arrays.stream(columns) + .filter(jdbcColumn -> StringUtils.isNotEmpty(jdbcColumn.comment())) + .forEach( + jdbcColumn -> + sqlBuilder + .append(NEW_LINE) + .append( + String.format( + "%s%s%s%s.%s%s%s%s%s';", Review Comment: Is there a space between COLUMN_COMMENT and HOLO_QUOTE? ########## catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java: ########## @@ -0,0 +1,1088 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.hologres.operation; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import javax.sql.DataSource; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.StringIdentifier; +import org.apache.gravitino.catalog.jdbc.JdbcColumn; +import org.apache.gravitino.catalog.jdbc.JdbcTable; +import org.apache.gravitino.catalog.jdbc.config.JdbcConfig; +import org.apache.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter; +import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter; +import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter; +import org.apache.gravitino.catalog.jdbc.operation.DatabaseOperation; +import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations; +import org.apache.gravitino.catalog.jdbc.operation.RequireDatabaseOperation; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.exceptions.NoSuchTableException; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.TableChange; +import org.apache.gravitino.rel.expressions.NamedReference; +import org.apache.gravitino.rel.expressions.UnparsedExpression; +import org.apache.gravitino.rel.expressions.distributions.Distribution; +import org.apache.gravitino.rel.expressions.distributions.Distributions; +import org.apache.gravitino.rel.expressions.distributions.Strategy; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.expressions.transforms.Transforms; +import org.apache.gravitino.rel.indexes.Index; + +/** + * Table operations for Hologres. + * + * <p>Hologres is PostgreSQL-compatible, so most table operations follow PostgreSQL conventions. + * However, Hologres has specific features like table properties (orientation, distribution_key, + * etc.) that are handled through the WITH clause in CREATE TABLE statements. + */ +public class HologresTableOperations extends JdbcTableOperations + implements RequireDatabaseOperation { + + public static final String HOLO_QUOTE = "\""; + public static final String NEW_LINE = "\n"; + public static final String ALTER_TABLE = "ALTER TABLE "; + public static final String ALTER_COLUMN = "ALTER COLUMN "; + public static final String IS = " IS '"; + public static final String COLUMN_COMMENT = "COMMENT ON COLUMN "; + public static final String TABLE_COMMENT = "COMMENT ON TABLE "; + + private static final String HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG = + "Hologres does not support nested column names."; + + /** Properties that are handled separately or read-only, excluded from the WITH clause. */ + private static final Set<String> EXCLUDED_TABLE_PROPERTIES = + ImmutableSet.of("distribution_key", "is_logical_partitioned_table", "primary_key"); + + private String database; + private HologresSchemaOperations schemaOperations; + + @Override + public void initialize( + DataSource dataSource, + JdbcExceptionConverter exceptionMapper, + JdbcTypeConverter jdbcTypeConverter, + JdbcColumnDefaultValueConverter jdbcColumnDefaultValueConverter, + Map<String, String> conf) { + super.initialize( + dataSource, exceptionMapper, jdbcTypeConverter, jdbcColumnDefaultValueConverter, conf); + database = new JdbcConfig(conf).getJdbcDatabase(); + Preconditions.checkArgument( + StringUtils.isNotBlank(database), + "The `jdbc-database` configuration item is mandatory in Hologres."); + } + + @Override + public void setDatabaseOperation(DatabaseOperation databaseOperation) { + this.schemaOperations = (HologresSchemaOperations) databaseOperation; + } + + @Override + public List<String> listTables(String schemaName) throws NoSuchSchemaException { + try (Connection connection = getConnection(schemaName)) { + if (!schemaOperations.schemaExists(connection, schemaName)) { + throw new NoSuchSchemaException("No such schema: %s", schemaName); + } + final List<String> names = Lists.newArrayList(); + try (ResultSet tables = getTables(connection)) { + while (tables.next()) { + if (Objects.equals(tables.getString("TABLE_SCHEM"), schemaName)) { + names.add(tables.getString("TABLE_NAME")); + } + } + } + LOG.info("Finished listing tables size {} for schema name {} ", names.size(), schemaName); + return names; + } catch (final SQLException se) { + throw this.exceptionMapper.toGravitinoException(se); + } + } + + @Override + protected JdbcTable.Builder getTableBuilder( + ResultSet tablesResult, String databaseName, String tableName) throws SQLException { + boolean found = false; + JdbcTable.Builder builder = null; + while (tablesResult.next() && !found) { + String tableNameInResult = tablesResult.getString("TABLE_NAME"); + String tableSchemaInResultLowerCase = tablesResult.getString("TABLE_SCHEM"); + if (Objects.equals(tableNameInResult, tableName) + && Objects.equals(tableSchemaInResultLowerCase, databaseName)) { + builder = getBasicJdbcTableInfo(tablesResult); + found = true; + } + } + + if (!found) { + throw new NoSuchTableException("Table %s does not exist in %s.", tableName, databaseName); + } + + return builder; + } + + @Override + protected JdbcColumn.Builder getColumnBuilder( + ResultSet columnsResult, String databaseName, String tableName) throws SQLException { + JdbcColumn.Builder builder = null; + if (Objects.equals(columnsResult.getString("TABLE_NAME"), tableName) + && Objects.equals(columnsResult.getString("TABLE_SCHEM"), databaseName)) { + builder = getBasicJdbcColumnInfo(columnsResult); + } + return builder; + } + + @Override + protected String generateCreateTableSql( + String tableName, + JdbcColumn[] columns, + String comment, + Map<String, String> properties, + Transform[] partitioning, + Distribution distribution, + Index[] indexes) { + boolean isLogicalPartition = + MapUtils.isNotEmpty(properties) + && "true".equalsIgnoreCase(properties.get("is_logical_partitioned_table")); + StringBuilder sqlBuilder = new StringBuilder(); + sqlBuilder + .append("CREATE TABLE ") + .append(HOLO_QUOTE) + .append(tableName) + .append(HOLO_QUOTE) + .append(" (") + .append(NEW_LINE); + + // Add columns + for (int i = 0; i < columns.length; i++) { + JdbcColumn column = columns[i]; + sqlBuilder.append(" ").append(HOLO_QUOTE).append(column.name()).append(HOLO_QUOTE); + + appendColumnDefinition(column, sqlBuilder); + // Add a comma for the next column, unless it's the last one + if (i < columns.length - 1) { + sqlBuilder.append(",").append(NEW_LINE); + } + } + appendIndexesSql(indexes, sqlBuilder); + sqlBuilder.append(NEW_LINE).append(")"); + + // Append partitioning clause if specified + if (ArrayUtils.isNotEmpty(partitioning)) { + appendPartitioningSql(partitioning, isLogicalPartition, sqlBuilder); + } + + // Build WITH clause combining distribution and Hologres-specific table properties + // Supported properties: orientation, distribution_key, clustering_key, event_time_column, + // bitmap_columns, dictionary_encoding_columns, time_to_live_in_seconds, table_group, etc. + List<String> withEntries = new ArrayList<>(); + + // Add distribution_key from Distribution parameter + if (!Distributions.NONE.equals(distribution)) { + validateDistribution(distribution); + String distributionColumns = + Arrays.stream(distribution.expressions()) + .map(Object::toString) + .collect(Collectors.joining(",")); + withEntries.add("distribution_key = '" + distributionColumns + "'"); + } + + // Add user-specified properties (filter out read-only / internally-handled properties) + if (MapUtils.isNotEmpty(properties)) { + properties.forEach( + (key, value) -> { + if (!EXCLUDED_TABLE_PROPERTIES.contains(key)) { + withEntries.add(key + " = '" + value + "'"); + } + }); + } + + // Generate WITH clause + if (!withEntries.isEmpty()) { + sqlBuilder.append(NEW_LINE).append("WITH (").append(NEW_LINE); + sqlBuilder.append( + withEntries.stream() + .map(entry -> " " + entry) + .collect(Collectors.joining("," + NEW_LINE))); + sqlBuilder.append(NEW_LINE).append(")"); + } + + sqlBuilder.append(";"); + + // Add table comment if specified + if (StringUtils.isNotEmpty(comment)) { + sqlBuilder + .append(NEW_LINE) + .append( + String.format( + "%s%s%s%s%s%s';", + TABLE_COMMENT, + HOLO_QUOTE, + tableName, + HOLO_QUOTE, + IS, + comment.replace("'", "''"))); + } + Arrays.stream(columns) + .filter(jdbcColumn -> StringUtils.isNotEmpty(jdbcColumn.comment())) + .forEach( + jdbcColumn -> + sqlBuilder + .append(NEW_LINE) + .append( + String.format( + "%s%s%s%s.%s%s%s%s%s';", + COLUMN_COMMENT, + HOLO_QUOTE, + tableName, + HOLO_QUOTE, + HOLO_QUOTE, + jdbcColumn.name(), + HOLO_QUOTE, + IS, + jdbcColumn.comment().replace("'", "''")))); + + // Return the generated SQL statement + String result = sqlBuilder.toString(); + + LOG.info("Generated create table:{} sql: {}", tableName, result); + return result; + } + + @VisibleForTesting + static void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) { + for (Index index : indexes) { + String fieldStr = getIndexFieldStr(index.fieldNames()); + sqlBuilder.append(",").append(NEW_LINE); + switch (index.type()) { + case PRIMARY_KEY: + if (StringUtils.isNotEmpty(index.name())) { Review Comment: So the name of the primary key can be customized and will not necessarily be limited to `PRIMARY` in Hologres? ########## catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java: ########## @@ -0,0 +1,1088 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.hologres.operation; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import javax.sql.DataSource; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.StringIdentifier; +import org.apache.gravitino.catalog.jdbc.JdbcColumn; +import org.apache.gravitino.catalog.jdbc.JdbcTable; +import org.apache.gravitino.catalog.jdbc.config.JdbcConfig; +import org.apache.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter; +import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter; +import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter; +import org.apache.gravitino.catalog.jdbc.operation.DatabaseOperation; +import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations; +import org.apache.gravitino.catalog.jdbc.operation.RequireDatabaseOperation; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.exceptions.NoSuchTableException; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.TableChange; +import org.apache.gravitino.rel.expressions.NamedReference; +import org.apache.gravitino.rel.expressions.UnparsedExpression; +import org.apache.gravitino.rel.expressions.distributions.Distribution; +import org.apache.gravitino.rel.expressions.distributions.Distributions; +import org.apache.gravitino.rel.expressions.distributions.Strategy; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.expressions.transforms.Transforms; +import org.apache.gravitino.rel.indexes.Index; + +/** + * Table operations for Hologres. + * + * <p>Hologres is PostgreSQL-compatible, so most table operations follow PostgreSQL conventions. + * However, Hologres has specific features like table properties (orientation, distribution_key, + * etc.) that are handled through the WITH clause in CREATE TABLE statements. + */ +public class HologresTableOperations extends JdbcTableOperations + implements RequireDatabaseOperation { + + public static final String HOLO_QUOTE = "\""; + public static final String NEW_LINE = "\n"; + public static final String ALTER_TABLE = "ALTER TABLE "; + public static final String ALTER_COLUMN = "ALTER COLUMN "; + public static final String IS = " IS '"; + public static final String COLUMN_COMMENT = "COMMENT ON COLUMN "; + public static final String TABLE_COMMENT = "COMMENT ON TABLE "; + + private static final String HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG = + "Hologres does not support nested column names."; + + /** Properties that are handled separately or read-only, excluded from the WITH clause. */ + private static final Set<String> EXCLUDED_TABLE_PROPERTIES = + ImmutableSet.of("distribution_key", "is_logical_partitioned_table", "primary_key"); + + private String database; + private HologresSchemaOperations schemaOperations; + + @Override + public void initialize( + DataSource dataSource, + JdbcExceptionConverter exceptionMapper, + JdbcTypeConverter jdbcTypeConverter, + JdbcColumnDefaultValueConverter jdbcColumnDefaultValueConverter, + Map<String, String> conf) { + super.initialize( + dataSource, exceptionMapper, jdbcTypeConverter, jdbcColumnDefaultValueConverter, conf); + database = new JdbcConfig(conf).getJdbcDatabase(); + Preconditions.checkArgument( + StringUtils.isNotBlank(database), + "The `jdbc-database` configuration item is mandatory in Hologres."); + } + + @Override + public void setDatabaseOperation(DatabaseOperation databaseOperation) { + this.schemaOperations = (HologresSchemaOperations) databaseOperation; + } + + @Override + public List<String> listTables(String schemaName) throws NoSuchSchemaException { + try (Connection connection = getConnection(schemaName)) { + if (!schemaOperations.schemaExists(connection, schemaName)) { + throw new NoSuchSchemaException("No such schema: %s", schemaName); + } + final List<String> names = Lists.newArrayList(); + try (ResultSet tables = getTables(connection)) { + while (tables.next()) { + if (Objects.equals(tables.getString("TABLE_SCHEM"), schemaName)) { + names.add(tables.getString("TABLE_NAME")); + } + } + } + LOG.info("Finished listing tables size {} for schema name {} ", names.size(), schemaName); + return names; + } catch (final SQLException se) { + throw this.exceptionMapper.toGravitinoException(se); + } + } + + @Override + protected JdbcTable.Builder getTableBuilder( + ResultSet tablesResult, String databaseName, String tableName) throws SQLException { + boolean found = false; + JdbcTable.Builder builder = null; + while (tablesResult.next() && !found) { + String tableNameInResult = tablesResult.getString("TABLE_NAME"); + String tableSchemaInResultLowerCase = tablesResult.getString("TABLE_SCHEM"); + if (Objects.equals(tableNameInResult, tableName) + && Objects.equals(tableSchemaInResultLowerCase, databaseName)) { + builder = getBasicJdbcTableInfo(tablesResult); + found = true; + } + } + + if (!found) { + throw new NoSuchTableException("Table %s does not exist in %s.", tableName, databaseName); + } + + return builder; + } + + @Override + protected JdbcColumn.Builder getColumnBuilder( + ResultSet columnsResult, String databaseName, String tableName) throws SQLException { + JdbcColumn.Builder builder = null; + if (Objects.equals(columnsResult.getString("TABLE_NAME"), tableName) + && Objects.equals(columnsResult.getString("TABLE_SCHEM"), databaseName)) { + builder = getBasicJdbcColumnInfo(columnsResult); + } + return builder; + } + + @Override + protected String generateCreateTableSql( + String tableName, + JdbcColumn[] columns, + String comment, + Map<String, String> properties, + Transform[] partitioning, + Distribution distribution, + Index[] indexes) { + boolean isLogicalPartition = + MapUtils.isNotEmpty(properties) + && "true".equalsIgnoreCase(properties.get("is_logical_partitioned_table")); + StringBuilder sqlBuilder = new StringBuilder(); + sqlBuilder + .append("CREATE TABLE ") + .append(HOLO_QUOTE) + .append(tableName) + .append(HOLO_QUOTE) + .append(" (") + .append(NEW_LINE); + + // Add columns + for (int i = 0; i < columns.length; i++) { + JdbcColumn column = columns[i]; + sqlBuilder.append(" ").append(HOLO_QUOTE).append(column.name()).append(HOLO_QUOTE); + + appendColumnDefinition(column, sqlBuilder); + // Add a comma for the next column, unless it's the last one + if (i < columns.length - 1) { + sqlBuilder.append(",").append(NEW_LINE); + } + } + appendIndexesSql(indexes, sqlBuilder); + sqlBuilder.append(NEW_LINE).append(")"); + + // Append partitioning clause if specified + if (ArrayUtils.isNotEmpty(partitioning)) { + appendPartitioningSql(partitioning, isLogicalPartition, sqlBuilder); + } + + // Build WITH clause combining distribution and Hologres-specific table properties + // Supported properties: orientation, distribution_key, clustering_key, event_time_column, + // bitmap_columns, dictionary_encoding_columns, time_to_live_in_seconds, table_group, etc. + List<String> withEntries = new ArrayList<>(); + + // Add distribution_key from Distribution parameter + if (!Distributions.NONE.equals(distribution)) { + validateDistribution(distribution); + String distributionColumns = + Arrays.stream(distribution.expressions()) + .map(Object::toString) + .collect(Collectors.joining(",")); + withEntries.add("distribution_key = '" + distributionColumns + "'"); + } + + // Add user-specified properties (filter out read-only / internally-handled properties) + if (MapUtils.isNotEmpty(properties)) { + properties.forEach( + (key, value) -> { + if (!EXCLUDED_TABLE_PROPERTIES.contains(key)) { + withEntries.add(key + " = '" + value + "'"); + } + }); + } + + // Generate WITH clause + if (!withEntries.isEmpty()) { + sqlBuilder.append(NEW_LINE).append("WITH (").append(NEW_LINE); + sqlBuilder.append( + withEntries.stream() + .map(entry -> " " + entry) + .collect(Collectors.joining("," + NEW_LINE))); + sqlBuilder.append(NEW_LINE).append(")"); + } + + sqlBuilder.append(";"); + + // Add table comment if specified + if (StringUtils.isNotEmpty(comment)) { + sqlBuilder + .append(NEW_LINE) + .append( + String.format( + "%s%s%s%s%s%s';", + TABLE_COMMENT, + HOLO_QUOTE, + tableName, + HOLO_QUOTE, + IS, + comment.replace("'", "''"))); + } + Arrays.stream(columns) + .filter(jdbcColumn -> StringUtils.isNotEmpty(jdbcColumn.comment())) + .forEach( + jdbcColumn -> + sqlBuilder + .append(NEW_LINE) + .append( + String.format( + "%s%s%s%s.%s%s%s%s%s';", + COLUMN_COMMENT, + HOLO_QUOTE, + tableName, + HOLO_QUOTE, + HOLO_QUOTE, + jdbcColumn.name(), + HOLO_QUOTE, + IS, + jdbcColumn.comment().replace("'", "''")))); + + // Return the generated SQL statement + String result = sqlBuilder.toString(); + + LOG.info("Generated create table:{} sql: {}", tableName, result); + return result; + } + + @VisibleForTesting + static void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) { + for (Index index : indexes) { + String fieldStr = getIndexFieldStr(index.fieldNames()); + sqlBuilder.append(",").append(NEW_LINE); + switch (index.type()) { + case PRIMARY_KEY: + if (StringUtils.isNotEmpty(index.name())) { + sqlBuilder + .append("CONSTRAINT ") + .append(HOLO_QUOTE) + .append(index.name()) + .append(HOLO_QUOTE); + } + sqlBuilder.append(" PRIMARY KEY (").append(fieldStr).append(")"); + break; + case UNIQUE_KEY: + if (StringUtils.isNotEmpty(index.name())) { + sqlBuilder + .append("CONSTRAINT ") + .append(HOLO_QUOTE) + .append(index.name()) + .append(HOLO_QUOTE); + } + sqlBuilder.append(" UNIQUE (").append(fieldStr).append(")"); + break; + default: + throw new IllegalArgumentException("Hologres doesn't support index : " + index.type()); + } + } + } + + protected static String getIndexFieldStr(String[][] fieldNames) { + return Arrays.stream(fieldNames) + .map( + colNames -> { + if (colNames.length > 1) { + throw new IllegalArgumentException( + "Index does not support complex fields in Hologres"); + } + return HOLO_QUOTE + colNames[0] + HOLO_QUOTE; + }) + .collect(Collectors.joining(", ")); + } + + /** + * Append the partitioning clause to the CREATE TABLE SQL. + * + * <p>Hologres supports two types of partition tables: + * + * <ul> + * <li>Physical partition table: uses {@code PARTITION BY LIST(column)} syntax + * <li>Logical partition table (V3.1+): uses {@code LOGICAL PARTITION BY LIST(column1[, + * column2])} syntax + * </ul> + * + * @param partitioning the partition transforms (only LIST partitioning is supported) + * @param isLogicalPartition whether to create a logical partition table + * @param sqlBuilder the SQL builder to append to + */ + @VisibleForTesting + static void appendPartitioningSql( + Transform[] partitioning, boolean isLogicalPartition, StringBuilder sqlBuilder) { + Preconditions.checkArgument( + partitioning.length == 1 && partitioning[0] instanceof Transforms.ListTransform, + "Hologres only supports LIST partitioning"); Review Comment: If the length of `partitioning` is not 1, the error message should not be `Hologres only supports LIST partitioning`. ########## docs/jdbc-hologres-catalog.md: ########## @@ -0,0 +1,274 @@ +--- +title: "Hologres catalog" +slug: /jdbc-hologres-catalog +keywords: +- jdbc +- Hologres +- metadata +license: "This software is licensed under the Apache License version 2." +--- + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +## Introduction + +Apache Gravitino provides the ability to manage [Hologres](https://help.aliyun.com/zh/hologres) metadata. + +Hologres is a real-time data warehouse service provided by Alibaba Cloud, designed for high-concurrency and low-latency online analytical processing (OLAP). Hologres is fully compatible with the PostgreSQL protocol and uses the PostgreSQL JDBC Driver for connections. + +:::caution +Gravitino saves some system information in schema and table comment, like `(From Gravitino, DO NOT EDIT: gravitino.v1.uid1078334182909406185)`, please don't change or remove this message. +::: + +## Catalog + +### Catalog capabilities + +- Gravitino catalog corresponds to a Hologres database instance. +- Supports metadata management of Hologres. +- Supports DDL operation for Hologres schemas and tables. +- Supports table index (PRIMARY KEY in CREATE TABLE). +- Supports [column default value](./manage-relational-metadata-using-gravitino.md#table-column-default-value). +- Supports LIST partitioning (physical and logical partition tables). +- Supports Hologres-specific table properties via `WITH` clause (orientation, clustering_key, distribution_key, etc.). +- Does not support [auto-increment](./manage-relational-metadata-using-gravitino.md#table-column-auto-increment). + +### Catalog properties + +You can pass to a Hologres data source any property that isn't defined by Gravitino by adding `gravitino.bypass.` prefix as a catalog property. For example, catalog property `gravitino.bypass.maxWaitMillis` will pass `maxWaitMillis` to the data source property. + +Check the relevant data source configuration in [data source properties](https://commons.apache.org/proper/commons-dbcp/configuration.html) + +If you use a JDBC catalog, you must provide `jdbc-url`, `jdbc-driver`, `jdbc-database`, `jdbc-user` and `jdbc-password` to catalog properties. +Besides the [common catalog properties](./gravitino-server-config.md#apache-gravitino-catalog-properties-configuration), the Hologres catalog has the following properties: + +| Configuration item | Description | Default value | Required | Since Version | +|-------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|----------|------------------| +| `jdbc-url` | JDBC URL for connecting to the database. For example, `jdbc:postgresql://hgprecn-cn-xxx.hologres.aliyuncs.com:80/my_database` | (none) | Yes | 0.9.0-incubating | +| `jdbc-driver` | The driver of the JDBC connection. Must be `org.postgresql.Driver`. | (none) | Yes | 0.9.0-incubating | +| `jdbc-database` | The database name. This is mandatory for Hologres. | (none) | Yes | 0.9.0-incubating | +| `jdbc-user` | The JDBC user name (AccessKey ID or database username). | (none) | Yes | 0.9.0-incubating | +| `jdbc-password` | The JDBC password (AccessKey Secret or database password). | (none) | Yes | 0.9.0-incubating | +| `jdbc.pool.min-size` | The minimum number of connections in the pool. `2` by default. | `2` | No | 0.9.0-incubating | +| `jdbc.pool.max-size` | The maximum number of connections in the pool. `10` by default. | `10` | No | 0.9.0-incubating | + +:::caution +Hologres uses the PostgreSQL JDBC Driver (version 42.3.2 or later recommended). Since the PostgreSQL JDBC Driver is already bundled with the Hologres catalog, you don't need to download it separately. +::: + +### Catalog operations + +Refer to [Manage Relational Metadata Using Gravitino](./manage-relational-metadata-using-gravitino.md#catalog-operations) for more details. + +## Schema + +### Schema capabilities + +- Gravitino's schema concept corresponds to the Hologres (PostgreSQL) schema. +- Supports creating schema with comment. +- Supports dropping schema. +- System schemas are automatically filtered: `pg_toast`, `pg_catalog`, `information_schema`, `hologres`, `hg_internal`, `hg_recyclebin`, `hologres_object_table`, `hologres_sample`, `hologres_streaming_mv`, `hologres_statistic`. + +### Schema properties + +- Doesn't support any schema property settings. + +### Schema operations + +Refer to [Manage Relational Metadata Using Gravitino](./manage-relational-metadata-using-gravitino.md#schema-operations) for more details. + +## Table + +### Table capabilities + +- Gravitino's table concept corresponds to the Hologres table. +- Supports DDL operation for Hologres tables. +- Supports PRIMARY KEY index in CREATE TABLE. +- Supports [column default value](./manage-relational-metadata-using-gravitino.md#table-column-default-value). +- Supports generated (stored computed) columns. +- Supports LIST partitioning (physical and logical). +- Does not support [auto-increment](./manage-relational-metadata-using-gravitino.md#table-column-auto-increment). Creating auto-increment columns is rejected in both CREATE TABLE and ALTER TABLE. + +### Table properties + +Hologres-specific table properties are set via the `WITH` clause during CREATE TABLE and read from the `hologres.hg_table_properties` system table. The following user-relevant properties are supported: + +| Property Key | Description | Example Value | +|-------------------------------------|-----------------------------------|------------------| Review Comment: Please align the table. ########## catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresTableOperations.java: ########## @@ -0,0 +1,1088 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.hologres.operation; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import javax.sql.DataSource; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.StringIdentifier; +import org.apache.gravitino.catalog.jdbc.JdbcColumn; +import org.apache.gravitino.catalog.jdbc.JdbcTable; +import org.apache.gravitino.catalog.jdbc.config.JdbcConfig; +import org.apache.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter; +import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter; +import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter; +import org.apache.gravitino.catalog.jdbc.operation.DatabaseOperation; +import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations; +import org.apache.gravitino.catalog.jdbc.operation.RequireDatabaseOperation; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.exceptions.NoSuchTableException; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.TableChange; +import org.apache.gravitino.rel.expressions.NamedReference; +import org.apache.gravitino.rel.expressions.UnparsedExpression; +import org.apache.gravitino.rel.expressions.distributions.Distribution; +import org.apache.gravitino.rel.expressions.distributions.Distributions; +import org.apache.gravitino.rel.expressions.distributions.Strategy; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.expressions.transforms.Transforms; +import org.apache.gravitino.rel.indexes.Index; + +/** + * Table operations for Hologres. + * + * <p>Hologres is PostgreSQL-compatible, so most table operations follow PostgreSQL conventions. + * However, Hologres has specific features like table properties (orientation, distribution_key, + * etc.) that are handled through the WITH clause in CREATE TABLE statements. + */ +public class HologresTableOperations extends JdbcTableOperations + implements RequireDatabaseOperation { + + public static final String HOLO_QUOTE = "\""; + public static final String NEW_LINE = "\n"; + public static final String ALTER_TABLE = "ALTER TABLE "; + public static final String ALTER_COLUMN = "ALTER COLUMN "; + public static final String IS = " IS '"; + public static final String COLUMN_COMMENT = "COMMENT ON COLUMN "; + public static final String TABLE_COMMENT = "COMMENT ON TABLE "; + + private static final String HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG = + "Hologres does not support nested column names."; + + /** Properties that are handled separately or read-only, excluded from the WITH clause. */ + private static final Set<String> EXCLUDED_TABLE_PROPERTIES = + ImmutableSet.of("distribution_key", "is_logical_partitioned_table", "primary_key"); + + private String database; + private HologresSchemaOperations schemaOperations; + + @Override + public void initialize( + DataSource dataSource, + JdbcExceptionConverter exceptionMapper, + JdbcTypeConverter jdbcTypeConverter, + JdbcColumnDefaultValueConverter jdbcColumnDefaultValueConverter, + Map<String, String> conf) { + super.initialize( + dataSource, exceptionMapper, jdbcTypeConverter, jdbcColumnDefaultValueConverter, conf); + database = new JdbcConfig(conf).getJdbcDatabase(); + Preconditions.checkArgument( + StringUtils.isNotBlank(database), + "The `jdbc-database` configuration item is mandatory in Hologres."); + } + + @Override + public void setDatabaseOperation(DatabaseOperation databaseOperation) { + this.schemaOperations = (HologresSchemaOperations) databaseOperation; + } + + @Override + public List<String> listTables(String schemaName) throws NoSuchSchemaException { + try (Connection connection = getConnection(schemaName)) { + if (!schemaOperations.schemaExists(connection, schemaName)) { + throw new NoSuchSchemaException("No such schema: %s", schemaName); + } + final List<String> names = Lists.newArrayList(); + try (ResultSet tables = getTables(connection)) { + while (tables.next()) { + if (Objects.equals(tables.getString("TABLE_SCHEM"), schemaName)) { + names.add(tables.getString("TABLE_NAME")); + } + } + } + LOG.info("Finished listing tables size {} for schema name {} ", names.size(), schemaName); + return names; + } catch (final SQLException se) { + throw this.exceptionMapper.toGravitinoException(se); + } + } + + @Override + protected JdbcTable.Builder getTableBuilder( + ResultSet tablesResult, String databaseName, String tableName) throws SQLException { + boolean found = false; + JdbcTable.Builder builder = null; + while (tablesResult.next() && !found) { + String tableNameInResult = tablesResult.getString("TABLE_NAME"); + String tableSchemaInResultLowerCase = tablesResult.getString("TABLE_SCHEM"); + if (Objects.equals(tableNameInResult, tableName) + && Objects.equals(tableSchemaInResultLowerCase, databaseName)) { + builder = getBasicJdbcTableInfo(tablesResult); + found = true; + } + } + + if (!found) { + throw new NoSuchTableException("Table %s does not exist in %s.", tableName, databaseName); + } + + return builder; + } + + @Override + protected JdbcColumn.Builder getColumnBuilder( + ResultSet columnsResult, String databaseName, String tableName) throws SQLException { + JdbcColumn.Builder builder = null; + if (Objects.equals(columnsResult.getString("TABLE_NAME"), tableName) + && Objects.equals(columnsResult.getString("TABLE_SCHEM"), databaseName)) { + builder = getBasicJdbcColumnInfo(columnsResult); + } + return builder; + } + + @Override + protected String generateCreateTableSql( + String tableName, + JdbcColumn[] columns, + String comment, + Map<String, String> properties, + Transform[] partitioning, + Distribution distribution, + Index[] indexes) { + boolean isLogicalPartition = + MapUtils.isNotEmpty(properties) + && "true".equalsIgnoreCase(properties.get("is_logical_partitioned_table")); + StringBuilder sqlBuilder = new StringBuilder(); + sqlBuilder + .append("CREATE TABLE ") + .append(HOLO_QUOTE) + .append(tableName) + .append(HOLO_QUOTE) + .append(" (") + .append(NEW_LINE); + + // Add columns + for (int i = 0; i < columns.length; i++) { + JdbcColumn column = columns[i]; + sqlBuilder.append(" ").append(HOLO_QUOTE).append(column.name()).append(HOLO_QUOTE); + + appendColumnDefinition(column, sqlBuilder); + // Add a comma for the next column, unless it's the last one + if (i < columns.length - 1) { + sqlBuilder.append(",").append(NEW_LINE); + } + } + appendIndexesSql(indexes, sqlBuilder); + sqlBuilder.append(NEW_LINE).append(")"); + + // Append partitioning clause if specified + if (ArrayUtils.isNotEmpty(partitioning)) { + appendPartitioningSql(partitioning, isLogicalPartition, sqlBuilder); + } + + // Build WITH clause combining distribution and Hologres-specific table properties + // Supported properties: orientation, distribution_key, clustering_key, event_time_column, + // bitmap_columns, dictionary_encoding_columns, time_to_live_in_seconds, table_group, etc. + List<String> withEntries = new ArrayList<>(); + + // Add distribution_key from Distribution parameter + if (!Distributions.NONE.equals(distribution)) { + validateDistribution(distribution); + String distributionColumns = + Arrays.stream(distribution.expressions()) + .map(Object::toString) + .collect(Collectors.joining(",")); + withEntries.add("distribution_key = '" + distributionColumns + "'"); + } + + // Add user-specified properties (filter out read-only / internally-handled properties) + if (MapUtils.isNotEmpty(properties)) { + properties.forEach( + (key, value) -> { + if (!EXCLUDED_TABLE_PROPERTIES.contains(key)) { + withEntries.add(key + " = '" + value + "'"); + } + }); + } + + // Generate WITH clause + if (!withEntries.isEmpty()) { + sqlBuilder.append(NEW_LINE).append("WITH (").append(NEW_LINE); + sqlBuilder.append( + withEntries.stream() + .map(entry -> " " + entry) + .collect(Collectors.joining("," + NEW_LINE))); + sqlBuilder.append(NEW_LINE).append(")"); + } + + sqlBuilder.append(";"); + + // Add table comment if specified + if (StringUtils.isNotEmpty(comment)) { + sqlBuilder + .append(NEW_LINE) + .append( + String.format( + "%s%s%s%s%s%s';", + TABLE_COMMENT, + HOLO_QUOTE, + tableName, + HOLO_QUOTE, + IS, + comment.replace("'", "''"))); + } + Arrays.stream(columns) + .filter(jdbcColumn -> StringUtils.isNotEmpty(jdbcColumn.comment())) + .forEach( + jdbcColumn -> + sqlBuilder + .append(NEW_LINE) + .append( + String.format( + "%s%s%s%s.%s%s%s%s%s';", + COLUMN_COMMENT, + HOLO_QUOTE, + tableName, + HOLO_QUOTE, + HOLO_QUOTE, + jdbcColumn.name(), + HOLO_QUOTE, + IS, + jdbcColumn.comment().replace("'", "''")))); + + // Return the generated SQL statement + String result = sqlBuilder.toString(); + + LOG.info("Generated create table:{} sql: {}", tableName, result); + return result; + } + + @VisibleForTesting + static void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) { + for (Index index : indexes) { + String fieldStr = getIndexFieldStr(index.fieldNames()); + sqlBuilder.append(",").append(NEW_LINE); + switch (index.type()) { + case PRIMARY_KEY: + if (StringUtils.isNotEmpty(index.name())) { + sqlBuilder + .append("CONSTRAINT ") + .append(HOLO_QUOTE) + .append(index.name()) + .append(HOLO_QUOTE); + } + sqlBuilder.append(" PRIMARY KEY (").append(fieldStr).append(")"); + break; + case UNIQUE_KEY: + if (StringUtils.isNotEmpty(index.name())) { + sqlBuilder + .append("CONSTRAINT ") + .append(HOLO_QUOTE) + .append(index.name()) + .append(HOLO_QUOTE); + } + sqlBuilder.append(" UNIQUE (").append(fieldStr).append(")"); + break; + default: + throw new IllegalArgumentException("Hologres doesn't support index : " + index.type()); + } + } + } + + protected static String getIndexFieldStr(String[][] fieldNames) { + return Arrays.stream(fieldNames) + .map( + colNames -> { + if (colNames.length > 1) { + throw new IllegalArgumentException( + "Index does not support complex fields in Hologres"); + } + return HOLO_QUOTE + colNames[0] + HOLO_QUOTE; + }) + .collect(Collectors.joining(", ")); + } + + /** + * Append the partitioning clause to the CREATE TABLE SQL. + * + * <p>Hologres supports two types of partition tables: + * + * <ul> + * <li>Physical partition table: uses {@code PARTITION BY LIST(column)} syntax + * <li>Logical partition table (V3.1+): uses {@code LOGICAL PARTITION BY LIST(column1[, + * column2])} syntax + * </ul> + * + * @param partitioning the partition transforms (only LIST partitioning is supported) + * @param isLogicalPartition whether to create a logical partition table + * @param sqlBuilder the SQL builder to append to + */ + @VisibleForTesting + static void appendPartitioningSql( + Transform[] partitioning, boolean isLogicalPartition, StringBuilder sqlBuilder) { + Preconditions.checkArgument( + partitioning.length == 1 && partitioning[0] instanceof Transforms.ListTransform, + "Hologres only supports LIST partitioning"); + + Transforms.ListTransform listTransform = (Transforms.ListTransform) partitioning[0]; + String[][] fieldNames = listTransform.fieldNames(); + + Preconditions.checkArgument(fieldNames.length > 0, "Partition columns must not be empty"); + + if (isLogicalPartition) { + Preconditions.checkArgument( + fieldNames.length <= 2, + "Logical partition table supports at most 2 partition columns, but got: %s", + fieldNames.length); + } else { + Preconditions.checkArgument( + fieldNames.length == 1, + "Physical partition table supports exactly 1 partition column, but got: %s", + fieldNames.length); + } + + String partitionColumns = + Arrays.stream(fieldNames) + .map( + colNames -> { + Preconditions.checkArgument( + colNames.length == 1, + "Hologres partition does not support nested field names"); + return HOLO_QUOTE + colNames[0] + HOLO_QUOTE; + }) + .collect(Collectors.joining(", ")); + + sqlBuilder.append(NEW_LINE); + if (isLogicalPartition) { + sqlBuilder.append("LOGICAL PARTITION BY LIST(").append(partitionColumns).append(")"); + } else { + sqlBuilder.append("PARTITION BY LIST(").append(partitionColumns).append(")"); + } + } + + private void appendColumnDefinition(JdbcColumn column, StringBuilder sqlBuilder) { + // Add data type + sqlBuilder.append(SPACE).append(typeConverter.fromGravitino(column.dataType())).append(SPACE); + + // Hologres does not support auto-increment columns via Gravitino + if (column.autoIncrement()) { + throw new IllegalArgumentException( + "Hologres does not support creating auto-increment columns via Gravitino, column: " + + column.name()); + } + + // Handle generated (stored computed) columns: + // GENERATED ALWAYS AS (expr) STORED must come before nullable constraints. + if (column.defaultValue() instanceof UnparsedExpression) { + String expr = ((UnparsedExpression) column.defaultValue()).unparsedExpression(); + sqlBuilder.append("GENERATED ALWAYS AS (").append(expr).append(") STORED "); + if (column.nullable()) { + sqlBuilder.append("NULL "); + } else { + sqlBuilder.append("NOT NULL "); + } + return; + } + + // Add NOT NULL if the column is marked as such + if (column.nullable()) { + sqlBuilder.append("NULL "); + } else { + sqlBuilder.append("NOT NULL "); + } + // Add DEFAULT value if specified + appendDefaultValue(column, sqlBuilder); + } + + @Override + protected String generateRenameTableSql(String oldTableName, String newTableName) { + return String.format( + "%s%s%s%s RENAME TO %s%s%s", + ALTER_TABLE, HOLO_QUOTE, oldTableName, HOLO_QUOTE, HOLO_QUOTE, newTableName, HOLO_QUOTE); + } + + @Override + protected String generateDropTableSql(String tableName) { + return String.format("DROP TABLE %s%s%s", HOLO_QUOTE, tableName, HOLO_QUOTE); + } + + @Override + protected String generatePurgeTableSql(String tableName) { + throw new UnsupportedOperationException( + "Hologres does not support purge table in Gravitino, please use drop table"); + } + + @Override + protected String generateAlterTableSql( + String schemaName, String tableName, TableChange... changes) { + // Not all operations require the original table information, so lazy loading is used here + JdbcTable lazyLoadTable = null; + List<String> alterSql = new ArrayList<>(); + for (TableChange change : changes) { + if (change instanceof TableChange.UpdateComment) { + lazyLoadTable = getOrCreateTable(schemaName, tableName, lazyLoadTable); + alterSql.add(updateCommentDefinition((TableChange.UpdateComment) change, lazyLoadTable)); + } else if (change instanceof TableChange.SetProperty) { + throw new IllegalArgumentException("Set property is not supported yet"); + } else if (change instanceof TableChange.RemoveProperty) { + throw new IllegalArgumentException("Remove property is not supported yet"); + } else if (change instanceof TableChange.AddColumn) { + TableChange.AddColumn addColumn = (TableChange.AddColumn) change; + lazyLoadTable = getOrCreateTable(schemaName, tableName, lazyLoadTable); + alterSql.addAll(addColumnFieldDefinition(addColumn, lazyLoadTable)); + } else if (change instanceof TableChange.RenameColumn) { + TableChange.RenameColumn renameColumn = (TableChange.RenameColumn) change; + alterSql.add(renameColumnFieldDefinition(renameColumn, tableName)); + } else if (change instanceof TableChange.UpdateColumnDefaultValue) { + throw new IllegalArgumentException( + "Hologres does not support altering column default value via ALTER TABLE."); + } else if (change instanceof TableChange.UpdateColumnType) { + throw new IllegalArgumentException( + "Hologres does not support altering column type via ALTER TABLE."); + } else if (change instanceof TableChange.UpdateColumnComment) { + alterSql.add( + updateColumnCommentFieldDefinition( + (TableChange.UpdateColumnComment) change, tableName)); + } else if (change instanceof TableChange.UpdateColumnPosition) { + throw new IllegalArgumentException("Hologres does not support column position."); + } else if (change instanceof TableChange.DeleteColumn) { + lazyLoadTable = getOrCreateTable(schemaName, tableName, lazyLoadTable); + TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) change; + String deleteColSql = deleteColumnFieldDefinition(deleteColumn, lazyLoadTable); + if (StringUtils.isNotEmpty(deleteColSql)) { + alterSql.add(deleteColSql); + } + } else if (change instanceof TableChange.UpdateColumnNullability) { + throw new IllegalArgumentException( + "Hologres does not support altering column nullability via ALTER TABLE."); + } else if (change instanceof TableChange.AddIndex) { + throw new IllegalArgumentException( + "Hologres does not support adding index via ALTER TABLE."); + } else if (change instanceof TableChange.DeleteIndex) { + throw new IllegalArgumentException( + "Hologres does not support deleting index via ALTER TABLE."); + } else if (change instanceof TableChange.UpdateColumnAutoIncrement) { + throw new IllegalArgumentException( + "Hologres does not support altering column auto-increment via ALTER TABLE."); + } else { + throw new IllegalArgumentException( + "Unsupported table change type: " + change.getClass().getName()); + } + } + + // Filter out empty strings and check if there are any actual changes + alterSql.removeIf(String::isEmpty); + if (alterSql.isEmpty()) { + return ""; + } + + // Return the generated SQL statement + String result = String.join("\n", alterSql); + LOG.info("Generated alter table:{}.{} sql: {}", schemaName, tableName, result); + return result; + } + + private String updateCommentDefinition( + TableChange.UpdateComment updateComment, JdbcTable jdbcTable) { + String newComment = updateComment.getNewComment(); + if (null == StringIdentifier.fromComment(newComment)) { + // Detect and add Gravitino id. + if (StringUtils.isNotEmpty(jdbcTable.comment())) { + StringIdentifier identifier = StringIdentifier.fromComment(jdbcTable.comment()); + if (null != identifier) { + newComment = StringIdentifier.addToComment(identifier, newComment); + } + } + } + return String.format( + "%s%s%s%s%s%s';", TABLE_COMMENT, HOLO_QUOTE, jdbcTable.name(), HOLO_QUOTE, IS, newComment); + } + + private String deleteColumnFieldDefinition( + TableChange.DeleteColumn deleteColumn, JdbcTable table) { + if (deleteColumn.fieldName().length > 1) { + throw new UnsupportedOperationException(HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG); + } + String col = deleteColumn.fieldName()[0]; + boolean colExists = + Arrays.stream(table.columns()).anyMatch(s -> StringUtils.equals(col, s.name())); + if (!colExists) { + if (BooleanUtils.isTrue(deleteColumn.getIfExists())) { + return ""; + } else { + throw new IllegalArgumentException("Delete column does not exist: " + col); + } + } + return String.format( + "%s%s%s%s DROP COLUMN %s%s%s;", + ALTER_TABLE, + HOLO_QUOTE, + table.name(), + HOLO_QUOTE, + HOLO_QUOTE, + deleteColumn.fieldName()[0], + HOLO_QUOTE); + } + + private String renameColumnFieldDefinition( + TableChange.RenameColumn renameColumn, String tableName) { + if (renameColumn.fieldName().length > 1) { + throw new UnsupportedOperationException(HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG); + } + return String.format( + "%s%s RENAME COLUMN %s%s%s TO %s%s%s;", + ALTER_TABLE, + tableName, + HOLO_QUOTE, + renameColumn.fieldName()[0], + HOLO_QUOTE, + HOLO_QUOTE, + renameColumn.getNewName(), + HOLO_QUOTE); + } + + private List<String> addColumnFieldDefinition( + TableChange.AddColumn addColumn, JdbcTable lazyLoadTable) { + if (addColumn.fieldName().length > 1) { + throw new UnsupportedOperationException(HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG); + } + + // Hologres does not support setting nullable, default value, or auto-increment via ADD COLUMN + if (!addColumn.isNullable()) { + throw new IllegalArgumentException( + "Hologres does not support setting NOT NULL constraint when adding a column via ALTER TABLE."); + } + if (!Column.DEFAULT_VALUE_NOT_SET.equals(addColumn.getDefaultValue())) { + throw new IllegalArgumentException( + "Hologres does not support setting default value when adding a column via ALTER TABLE."); + } + if (addColumn.isAutoIncrement()) { + throw new IllegalArgumentException( + "Hologres does not support setting auto-increment when adding a column via ALTER TABLE."); + } + + List<String> result = new ArrayList<>(); + String col = addColumn.fieldName()[0]; + + String columnDefinition = + String.format( + "%s%s ADD COLUMN %s%s%s %s", + ALTER_TABLE, + lazyLoadTable.name(), + HOLO_QUOTE, + col, + HOLO_QUOTE, + typeConverter.fromGravitino(addColumn.getDataType())); + + // Append position if available + if (!(addColumn.getPosition() instanceof TableChange.Default)) { + throw new IllegalArgumentException("Hologres does not support column position in Gravitino."); + } + result.add(columnDefinition + ";"); + + // Append comment if available + if (StringUtils.isNotEmpty(addColumn.getComment())) { + result.add( + String.format( + "%s%s%s%s.%s%s%s%s%s';", + COLUMN_COMMENT, + HOLO_QUOTE, + lazyLoadTable.name(), + HOLO_QUOTE, + HOLO_QUOTE, + col, + HOLO_QUOTE, + IS, + addColumn.getComment())); + } + return result; + } + + private String updateColumnCommentFieldDefinition( + TableChange.UpdateColumnComment updateColumnComment, String tableName) { + String newComment = updateColumnComment.getNewComment(); + if (updateColumnComment.fieldName().length > 1) { + throw new UnsupportedOperationException(HOLOGRES_NOT_SUPPORT_NESTED_COLUMN_MSG); + } + String col = updateColumnComment.fieldName()[0]; + return String.format( + "%s%s%s%s.%s%s%s%s%s';", + COLUMN_COMMENT, + HOLO_QUOTE, + tableName, + HOLO_QUOTE, + HOLO_QUOTE, + col, + HOLO_QUOTE, + IS, + newComment); + } + + @Override + protected ResultSet getIndexInfo(String schemaName, String tableName, DatabaseMetaData metaData) + throws SQLException { + return metaData.getIndexInfo(database, schemaName, tableName, false, false); + } + + @Override + protected ResultSet getPrimaryKeys(String schemaName, String tableName, DatabaseMetaData metaData) + throws SQLException { + return metaData.getPrimaryKeys(database, schemaName, tableName); + } + + @Override + protected Connection getConnection(String schema) throws SQLException { + Connection connection = dataSource.getConnection(); + connection.setCatalog(database); + connection.setSchema(schema); + return connection; + } + + /** + * Get tables from the database including regular tables and partitioned parent tables. + * + * <p>In Hologres (PostgreSQL-compatible): + * + * <ul> + * <li>Regular tables and partition child tables have TABLE_TYPE = "TABLE" + * <li>Partitioned parent tables have TABLE_TYPE = "PARTITIONED TABLE" + * <li>Views have TABLE_TYPE = "VIEW" (excluded from listing) + * <li>Foreign tables have TABLE_TYPE = "FOREIGN TABLE" (excluded from listing) + * </ul> + * + * <p>This method overrides the parent to include regular tables and partition parent tables, but + * excludes views and foreign tables from the table list. + * + * @param connection the database connection + * @return ResultSet containing table metadata + * @throws SQLException if a database access error occurs + */ + @Override + protected ResultSet getTables(Connection connection) throws SQLException { + DatabaseMetaData metaData = connection.getMetaData(); + String catalogName = connection.getCatalog(); + String schemaName = connection.getSchema(); + // Include "TABLE" (regular tables and partition children), + // and "PARTITIONED TABLE" (partition parent tables) + // Exclude "VIEW" and "FOREIGN TABLE" to hide views and foreign tables from Gravitino + return metaData.getTables( + catalogName, schemaName, null, new String[] {"TABLE", "PARTITIONED TABLE"}); + } + + @Override + protected ResultSet getTable(Connection connection, String schema, String tableName) + throws SQLException { + DatabaseMetaData metaData = connection.getMetaData(); + // Include TABLE and PARTITIONED TABLE types + // Exclude VIEW and FOREIGN TABLE to hide views and foreign tables from Gravitino + return metaData.getTables( + database, schema, tableName, new String[] {"TABLE", "PARTITIONED TABLE"}); + } + + @Override + protected ResultSet getColumns(Connection connection, String schema, String tableName) + throws SQLException { + DatabaseMetaData metaData = connection.getMetaData(); + return metaData.getColumns(database, schema, tableName, null); + } + + /** + * Get distribution information from Hologres system table hologres.hg_table_properties. + * + * <p>In Hologres, distribution_key is stored as a table property with the property_key + * "distribution_key" and property_value as comma-separated column names (e.g., "col1,col2"). + * + * <p>This method queries the system table and returns a HASH distribution with the specified + * columns. Hologres only supports HASH distribution strategy. + * + * @param connection the database connection + * @param databaseName the schema name + * @param tableName the table name + * @return the distribution info, or {@link Distributions#NONE} if no distribution_key is set + * @throws SQLException if a database access error occurs + */ + @Override + protected Distribution getDistributionInfo( + Connection connection, String databaseName, String tableName) throws SQLException { + String schemaName = connection.getSchema(); + String distributionSql = + "SELECT property_value " + + "FROM hologres.hg_table_properties " + + "WHERE table_namespace = ? AND table_name = ? AND property_key = 'distribution_key'"; + + try (PreparedStatement statement = connection.prepareStatement(distributionSql)) { + statement.setString(1, schemaName); + statement.setString(2, tableName); + + try (ResultSet resultSet = statement.executeQuery()) { + if (resultSet.next()) { + String distributionKey = resultSet.getString("property_value"); + if (StringUtils.isNotEmpty(distributionKey)) { + NamedReference[] columns = + Arrays.stream(distributionKey.split(",")) + .map(String::trim) + .filter(StringUtils::isNotEmpty) + .map(NamedReference::field) + .toArray(NamedReference[]::new); + if (columns.length > 0) { + return Distributions.hash(0, columns); + } + } + } + } + } + return Distributions.NONE; + } + + /** + * Validate the distribution for Hologres. + * + * <p>Hologres only supports HASH distribution strategy. + * + * @param distribution the distribution to validate + */ + private void validateDistribution(Distribution distribution) { + Preconditions.checkArgument( + distribution.strategy() == Strategy.HASH, + "Hologres only supports HASH distribution strategy, but got: %s", + distribution.strategy()); + Preconditions.checkArgument( + distribution.expressions().length > 0, + "Hologres HASH distribution requires at least one distribution column"); + } + + @Override + public Integer calculateDatetimePrecision(String typeName, int columnSize, int scale) { + String upperTypeName = typeName.toUpperCase(); + switch (upperTypeName) { + case "TIME": + case "TIMETZ": + case "TIMESTAMP": + case "TIMESTAMPTZ": + return Math.max(scale, 0); + default: + return null; + } + } + + /** + * Get table partitioning information from PostgreSQL system tables. + * + * <p>Hologres (PostgreSQL-compatible) only supports LIST partitioning. This method queries + * pg_partitioned_table and pg_attribute system tables to determine if a table is partitioned, and + * if so, returns the partition column names as a LIST transform. + * + * <p>The SQL queries follow the same approach used by Holo Client's ConnectionUtil: + * + * <ol> + * <li>Query pg_partitioned_table to check if the table is partitioned and get partition column + * attribute numbers (partattrs) + * <li>Query pg_attribute to resolve attribute numbers to column names + * </ol> + * + * @param connection the database connection + * @param databaseName the schema name + * @param tableName the table name + * @return the partition transforms, or empty if the table is not partitioned + * @throws SQLException if a database access error occurs + */ + @Override + protected Transform[] getTablePartitioning( + Connection connection, String databaseName, String tableName) throws SQLException { + + // First, check if this is a logical partitioned table by querying table properties. + // Logical partition tables in Hologres (V3.1+) have the property + // "is_logical_partitioned_table" set to "true", and partition columns are stored in + // the "logical_partition_columns" property. + Transform[] logicalPartitioning = getLogicalPartitioning(connection, tableName); + if (logicalPartitioning.length > 0) { + return logicalPartitioning; + } + + // Fall back to physical partition table check via pg_partitioned_table system table + return getPhysicalPartitioning(connection, databaseName, tableName); + } + + /** + * Get logical partition information from Hologres table properties. + * + * <p>Hologres V3.1+ supports logical partition tables where the parent table is a physical table + * and child partitions are logical concepts. A logical partition table is identified by the + * property "is_logical_partitioned_table" = "true" in hologres.hg_table_properties, and its + * partition columns are stored in the "logical_partition_columns" property. + * + * @param connection the database connection + * @param tableName the table name + * @return the partition transforms, or empty if the table is not a logical partition table + * @throws SQLException if a database access error occurs + */ + private Transform[] getLogicalPartitioning(Connection connection, String tableName) + throws SQLException { + String schemaName = connection.getSchema(); + String logicalPartitionSql = + "SELECT property_key, property_value " + + "FROM hologres.hg_table_properties " + + "WHERE table_namespace = ? AND table_name = ? " + + "AND property_key IN ('is_logical_partitioned_table', 'logical_partition_columns')"; + + String isLogicalPartitioned = null; + String logicalPartitionColumns = null; + + try (PreparedStatement statement = connection.prepareStatement(logicalPartitionSql)) { + statement.setString(1, schemaName); + statement.setString(2, tableName); + + try (ResultSet resultSet = statement.executeQuery()) { + while (resultSet.next()) { + String key = resultSet.getString("property_key"); + String value = resultSet.getString("property_value"); + if ("is_logical_partitioned_table".equals(key)) { + isLogicalPartitioned = value; + } else if ("logical_partition_columns".equals(key)) { + logicalPartitionColumns = value; + } + } + } + } + + if (!"true".equalsIgnoreCase(isLogicalPartitioned) + || StringUtils.isEmpty(logicalPartitionColumns)) { + return Transforms.EMPTY_TRANSFORM; + } + + // Parse partition column names (comma-separated, e.g., "col1" or "col1,col2") + String[][] fieldNames = + Arrays.stream(logicalPartitionColumns.split(",")) + .map(String::trim) + .filter(StringUtils::isNotEmpty) + .map(col -> new String[] {col}) + .toArray(String[][]::new); + + if (fieldNames.length == 0) { + return Transforms.EMPTY_TRANSFORM; + } + + return new Transform[] {Transforms.list(fieldNames)}; + } + + /** + * Get physical partition information from PostgreSQL system tables. + * + * <p>Hologres (PostgreSQL-compatible) only supports LIST partitioning for physical partitions. + * This method queries pg_partitioned_table and pg_attribute system tables to determine if a table + * is partitioned, and if so, returns the partition column names as a LIST transform. + * + * @param connection the database connection + * @param databaseName the schema name + * @param tableName the table name + * @return the partition transforms, or empty if the table is not a physical partition table + * @throws SQLException if a database access error occurs + */ + private Transform[] getPhysicalPartitioning( + Connection connection, String databaseName, String tableName) throws SQLException { + + // Query pg_partitioned_table to get partition strategy and column attribute numbers + String partitionSql = + "SELECT part.partstrat, part.partnatts, part.partattrs " + + "FROM pg_catalog.pg_class c " + + "JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace " + + "JOIN pg_catalog.pg_partitioned_table part ON c.oid = part.partrelid " + + "WHERE n.nspname = ? AND c.relname = ? " + + "LIMIT 1"; + + String partStrategy = null; + String partAttrs = null; + + try (PreparedStatement statement = connection.prepareStatement(partitionSql)) { + statement.setString(1, databaseName); + statement.setString(2, tableName); + + try (ResultSet resultSet = statement.executeQuery()) { + if (resultSet.next()) { + partStrategy = resultSet.getString("partstrat"); + partAttrs = resultSet.getString("partattrs"); + } + } + } + + // Not a partitioned table + if (partStrategy == null || partAttrs == null) { + return Transforms.EMPTY_TRANSFORM; + } + + // Parse partition attribute numbers (e.g., "1" or "1 2") + String[] attrNums = partAttrs.trim().split("\\s+"); + List<String[]> partitionColumnNames = new ArrayList<>(); + + // Resolve attribute numbers to column names + String attrSql = + "SELECT attname FROM pg_catalog.pg_attribute " + + "WHERE attrelid = (SELECT c.oid FROM pg_catalog.pg_class c " + + " JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace " + + " WHERE n.nspname = ? AND c.relname = ?) " + + "AND attnum = ?"; + + for (String attrNum : attrNums) { + try (PreparedStatement statement = connection.prepareStatement(attrSql)) { + statement.setString(1, databaseName); + statement.setString(2, tableName); + statement.setInt(3, Integer.parseInt(attrNum)); + + try (ResultSet resultSet = statement.executeQuery()) { + if (resultSet.next()) { + partitionColumnNames.add(new String[] {resultSet.getString("attname")}); + } + } + } + } + + if (partitionColumnNames.isEmpty()) { + return Transforms.EMPTY_TRANSFORM; + } + + // Hologres only supports LIST partitioning (partstrat = 'l') + // Return a LIST transform with the partition column names + String[][] fieldNames = partitionColumnNames.toArray(new String[0][]); + return new Transform[] {Transforms.list(fieldNames)}; + } + + /** + * Get table properties from Hologres system table hologres.hg_table_properties. + * + * <p>This method queries the Hologres system table to retrieve table properties such as: + * + * <ul> + * <li>orientation: storage format (row/column/row,column) + * <li>clustering_key: clustering key columns + * <li>segment_key: event time column (segment key) + * <li>bitmap_columns: bitmap index columns + * <li>dictionary_encoding_columns: dictionary encoding columns + * <li>primary_key: primary key columns + * <li>time_to_live_in_seconds: TTL setting + * <li>table_group: table group name + * </ul> + * + * @param connection the database connection + * @param tableName the name of the table + * @return a map of table properties + * @throws SQLException if a database access error occurs + */ + @Override + protected Map<String, String> getTableProperties(Connection connection, String tableName) + throws SQLException { + Map<String, String> properties = new HashMap<>(); + String schemaName = connection.getSchema(); + + // Query table properties from hologres.hg_table_properties system table + // The system table stores each property as a separate row with property_key and property_value + String propertiesSql = + "SELECT property_key, property_value " + + "FROM hologres.hg_table_properties " + + "WHERE table_namespace = ? AND table_name = ?"; + + try (PreparedStatement statement = connection.prepareStatement(propertiesSql)) { + statement.setString(1, schemaName); + statement.setString(2, tableName); + + try (ResultSet resultSet = statement.executeQuery()) { + while (resultSet.next()) { + String propertyKey = resultSet.getString("property_key"); + String propertyValue = resultSet.getString("property_value"); + + // Only include meaningful properties that users care about + if (StringUtils.isNotEmpty(propertyValue) && isUserRelevantProperty(propertyKey)) { + // Convert JDBC property keys to DDL-compatible keys + // Hologres system table stores "binlog.level" and "binlog.ttl", + // but CREATE TABLE WITH clause uses "binlog_level" and "binlog_ttl" + String normalizedKey = convertFromJdbcPropertyKey(propertyKey); + properties.put(normalizedKey, propertyValue); + } + } + } + } + + LOG.debug("Loaded table properties for {}.{}: {}", schemaName, tableName, properties); + return properties; + } + + /** + * Convert JDBC property key to DDL-compatible property key. + * + * <p>Hologres system table {@code hologres.hg_table_properties} stores some property keys with + * dots (e.g., "binlog.level", "binlog.ttl"), but the CREATE TABLE WITH clause uses underscores + * (e.g., "binlog_level", "binlog_ttl"). This method converts from the JDBC format to the DDL + * format so that properties can be round-tripped correctly. + * + * @param jdbcKey the property key from the JDBC query + * @return the DDL-compatible property key + */ + private String convertFromJdbcPropertyKey(String jdbcKey) { + switch (jdbcKey) { + case "binlog.level": + return "binlog_level"; + case "binlog.ttl": + return "binlog_ttl"; + default: + return jdbcKey; + } + } + + /** + * Check if a property key is relevant for users to see. + * + * <p>This filters out internal system properties and only returns properties that are meaningful + * for users. + * + * @param propertyKey the property key to check + * @return true if the property is relevant for users + */ + private boolean isUserRelevantProperty(String propertyKey) { Review Comment: Use a map to replace it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
