This is an automated email from the ASF dual-hosted git repository.
yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 2a6e464d55 [#10072] feat(catalog-jdbc-hologres): Add schema operations
for Hologres catalog (#10067)
2a6e464d55 is described below
commit 2a6e464d5509c5132733eeb1b2c53a644503df04
Author: Ye Ding <[email protected]>
AuthorDate: Tue Mar 3 14:03:44 2026 +0800
[#10072] feat(catalog-jdbc-hologres): Add schema operations for Hologres
catalog (#10067)
### What changes were proposed in this pull request?
Replace the stub `HologresSchemaOperations` with the full
implementation, supporting:
- Schema initialization with JDBC database configuration
- Load schema with comment retrieval via `pg_catalog`
- List schemas with system schema filtering (e.g., `pg_toast`,
`hologres`, `hg_internal`)
- Create schema with optional `COMMENT ON SCHEMA` support
- Drop schema with cascade option
- Custom connection management with catalog setting
### Why are the changes needed?
Enables Gravitino to manage Hologres schemas (PostgreSQL-style schemas
within a database).
Fix: https://github.com/apache/gravitino/issues/10072
### Does this PR introduce _any_ user-facing change?
No. The Hologres catalog is not yet documented or released.
### How was this patch tested?
- Compilation verified: `./gradlew
:catalogs-contrib:catalog-jdbc-hologres:compileJava`
- Existing unit tests pass: `./gradlew
:catalogs-contrib:catalog-jdbc-hologres:test -PskipITs`
---
.../operation/HologresSchemaOperations.java | 164 ++++++++++++++++++++-
1 file changed, 162 insertions(+), 2 deletions(-)
diff --git
a/catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresSchemaOperations.java
b/catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresSchemaOperations.java
index c179f70b0d..3622f53f2f 100644
---
a/catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresSchemaOperations.java
+++
b/catalogs-contrib/catalog-jdbc-hologres/src/main/java/org/apache/gravitino/catalog/hologres/operation/HologresSchemaOperations.java
@@ -18,20 +18,142 @@
*/
package org.apache.gravitino.catalog.hologres.operation;
+import static
org.apache.gravitino.catalog.hologres.operation.HologresTableOperations.HOLO_QUOTE;
+
import com.google.common.collect.ImmutableSet;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.Set;
+import javax.sql.DataSource;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.catalog.jdbc.JdbcSchema;
+import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
import org.apache.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.meta.AuditInfo;
/**
* Schema (Database) operations for Hologres.
*
* <p>Hologres uses the PostgreSQL schema concept where Database in PostgreSQL
corresponds to
* Catalog in JDBC, and Schema in PostgreSQL corresponds to Schema in JDBC.
- *
- * <p>TODO: Full implementation will be added in a follow-up PR.
*/
public class HologresSchemaOperations extends JdbcDatabaseOperations {
+ private String database;
+
+ @Override
+ public void initialize(
+ DataSource dataSource, JdbcExceptionConverter exceptionMapper,
Map<String, String> conf) {
+ super.initialize(dataSource, exceptionMapper, conf);
+ database = new JdbcConfig(conf).getJdbcDatabase();
+ }
+
+ @Override
+ public JdbcSchema load(String schema) throws NoSuchSchemaException {
+ try (Connection connection = getConnection()) {
+ if (!schemaExists(connection, schema)) {
+ throw new NoSuchSchemaException("No such schema: %s", schema);
+ }
+
+ String comment = getSchemaComment(schema, connection);
+ return JdbcSchema.builder()
+ .withName(schema)
+ .withComment(comment)
+ .withAuditInfo(AuditInfo.EMPTY)
+ .withProperties(Collections.emptyMap())
+ .build();
+ } catch (SQLException e) {
+ throw exceptionMapper.toGravitinoException(e);
+ }
+ }
+
+ @Override
+ public List<String> listDatabases() {
+ List<String> result = new ArrayList<>();
+ try (Connection connection = getConnection();
+ ResultSet resultSet = getSchema(connection, null)) {
+ while (resultSet.next()) {
+ String schemaName = resultSet.getString(1);
+ if (!isSystemDatabase(schemaName)) {
+ result.add(resultSet.getString(1));
+ }
+ }
+ } catch (final SQLException se) {
+ throw this.exceptionMapper.toGravitinoException(se);
+ }
+ return result;
+ }
+
+ @Override
+ public String generateCreateDatabaseSql(
+ String schema, String comment, Map<String, String> properties) {
+ if (MapUtils.isNotEmpty(properties)) {
+ throw new UnsupportedOperationException(
+ "Hologres does not support properties on schema create.");
+ }
+
+ StringBuilder sqlBuilder =
+ new StringBuilder(String.format("CREATE SCHEMA %s%s%s;", HOLO_QUOTE,
schema, HOLO_QUOTE));
+ if (StringUtils.isNotEmpty(comment)) {
+ String escapedComment = comment.replace("'", "''");
+ sqlBuilder.append(
+ String.format(
+ "COMMENT ON SCHEMA %s%s%s IS '%s'", HOLO_QUOTE, schema,
HOLO_QUOTE, escapedComment));
+ }
+ return sqlBuilder.toString();
+ }
+
+ /**
+ * Get the schema with the given name.
+ *
+ * <p>Note: This method will return a result set that may contain multiple
rows as the schemaName
+ * in `getSchemas` is a pattern. The result set will contain all schemas
that match the pattern.
+ *
+ * <p>Database in Hologres (PostgreSQL) corresponds to Catalog in JDBC.
Schema in Hologres
+ * corresponds to Schema in JDBC.
+ *
+ * @param connection the connection to the database
+ * @param schemaName the name of the schema
+ */
+ private ResultSet getSchema(Connection connection, String schemaName) throws
SQLException {
+ final DatabaseMetaData metaData = connection.getMetaData();
+ return metaData.getSchemas(database, schemaName);
+ }
+
+ @Override
+ public String generateDropDatabaseSql(String schema, boolean cascade) {
+ StringBuilder sqlBuilder =
+ new StringBuilder(String.format("DROP SCHEMA %s%s%s", HOLO_QUOTE,
schema, HOLO_QUOTE));
+ if (cascade) {
+ sqlBuilder.append(" CASCADE");
+ }
+ return sqlBuilder.toString();
+ }
+
+ @Override
+ protected Connection getConnection() throws SQLException {
+ Connection connection = dataSource.getConnection();
+ connection.setCatalog(database);
+ return connection;
+ }
+
+ @Override
+ protected String generateDatabaseExistSql(String databaseName) {
+ return String.format(
+ "SELECT n.datname FROM pg_catalog.pg_database n where n.datname='%s'",
databaseName);
+ }
+
@Override
protected boolean supportSchemaComment() {
return true;
@@ -53,4 +175,42 @@ public class HologresSchemaOperations extends
JdbcDatabaseOperations {
"hologres_streaming_mv",
"hologres_statistic");
}
+
+ /**
+ * Check if the schema exists in the database.
+ *
+ * @param connection the connection to the database
+ * @param schema the name of the schema
+ * @return true if the schema exists, false otherwise
+ */
+ public boolean schemaExists(Connection connection, String schema) throws
SQLException {
+ try (ResultSet resultSet = getSchema(connection, schema)) {
+ while (resultSet.next()) {
+ if (Objects.equals(resultSet.getString(1), schema)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private String getShowSchemaCommentSql(String schema) {
+ return String.format(
+ "SELECT obj_description(n.oid, 'pg_namespace') AS comment\n"
+ + "FROM pg_catalog.pg_namespace n\n"
+ + "WHERE n.nspname = '%s';\n",
+ schema);
+ }
+
+ private String getSchemaComment(String schema, Connection connection) throws
SQLException {
+ try (PreparedStatement preparedStatement =
+ connection.prepareStatement(getShowSchemaCommentSql(schema))) {
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ if (resultSet.next()) {
+ return resultSet.getString("comment");
+ }
+ }
+ }
+ return null;
+ }
}