This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 6fdda220d0 [#10362] feat(catalog-jdbc-hologres): Add integration tests 
for Hologres JDBC catalog (#10364)
6fdda220d0 is described below

commit 6fdda220d06c875bad87c7d4f2ec72219f5c7c14
Author: Ye Ding <[email protected]>
AuthorDate: Thu Mar 12 18:52:09 2026 +0800

    [#10362] feat(catalog-jdbc-hologres): Add integration tests for Hologres 
JDBC catalog (#10364)
    
    ### What changes were proposed in this pull request?
    
    Add cloud-based integration tests for the Hologres JDBC catalog. Since
    Hologres is a managed cloud service on Alibaba Cloud, the tests connect
    to a real Hologres instance and are controlled by environment variables.
    
    New files:
    - `CatalogHologresIT.java`: Integration test class extending `BaseIT`,
    covering schema CRUD, table CRUD, type conversion (13 types), ALTER/DROP
    table, distribution key, table properties, primary key index, column
    default values, and schema comments.
    - `HologresService.java`: JDBC helper class for direct verification
    against the Hologres instance.
    - `README.md`: Documentation with environment variable configuration and
    test execution instructions.
    
    Modified files:
    - `CatalogManager.java`: Add `jdbc-hologres` to `CONTRIB_CATALOGS_TYPES`
    to ensure correct catalog path resolution in test environment.
    
    ### Why are the changes needed?
    
    The Hologres JDBC catalog only has unit tests for SQL generation and
    type conversion. It lacks end-to-end integration tests that verify the
    complete Gravitino workflow with a real Hologres instance. This PR fills
    that gap by following the cloud-service IT pattern used by
    `GravitinoVirtualFileSystemOSSIT`.
    
    Fix: https://github.com/apache/gravitino/issues/10362
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    - Compilation verified via `./gradlew
    :catalogs-contrib:catalog-jdbc-hologres:compileTestJava`
    - Existing unit tests pass via `./gradlew
    :catalogs-contrib:catalog-jdbc-hologres:test -PskipITs`
    - Integration tests can be run by setting `GRAVITINO_HOLOGRES_JDBC_URL`,
    `GRAVITINO_HOLOGRES_USERNAME`, `GRAVITINO_HOLOGRES_PASSWORD` and running
    `./gradlew :catalogs-contrib:catalog-jdbc-hologres:test`
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 catalogs-contrib/catalog-jdbc-hologres/README.md   |  73 ++
 .../integration/test/CatalogHologresIT.java        | 952 +++++++++++++++++++++
 .../integration/test/service/HologresService.java  |  96 +++
 .../apache/gravitino/catalog/CatalogManager.java   |   2 +-
 4 files changed, 1122 insertions(+), 1 deletion(-)

diff --git a/catalogs-contrib/catalog-jdbc-hologres/README.md 
b/catalogs-contrib/catalog-jdbc-hologres/README.md
new file mode 100644
index 0000000000..6bca2e6faf
--- /dev/null
+++ b/catalogs-contrib/catalog-jdbc-hologres/README.md
@@ -0,0 +1,73 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# Hologres JDBC Catalog
+
+## Integration Tests
+
+Since Hologres is a cloud service hosted on Alibaba Cloud, integration tests 
require a real Hologres instance. These tests are disabled by default and only 
run when the appropriate environment variables are set.
+
+### Prerequisites
+
+- A running Hologres instance on Alibaba Cloud
+- A database created in the Hologres instance
+- Credentials with sufficient permissions to create/drop schemas and tables
+
+### Environment Variables
+
+| Variable                      | Required | Description                       
                                   |
+|:------------------------------|:---------|:---------------------------------------------------------------------|
+| `GRAVITINO_HOLOGRES_JDBC_URL` | Yes      | Hologres JDBC URL, e.g. 
`jdbc:postgresql://<host>:<port>/<database>` |
+| `GRAVITINO_HOLOGRES_USERNAME` | Yes      | Hologres access key ID or 
username                                   |
+| `GRAVITINO_HOLOGRES_PASSWORD` | Yes      | Hologres access key secret or 
password                               |
+
+The tests are automatically enabled when all three environment variables are 
set and non-blank.
+
+### Running Integration Tests
+
+```bash
+# Set environment variables
+export GRAVITINO_HOLOGRES_JDBC_URL="jdbc:postgresql://<host>:<port>/<database>"
+export GRAVITINO_HOLOGRES_USERNAME="<your-username>"
+export GRAVITINO_HOLOGRES_PASSWORD="<your-password>"
+
+# Run integration tests
+./gradlew :catalogs-contrib:catalog-jdbc-hologres:test
+```
+
+### Running Unit Tests Only
+
+To skip integration tests and run only unit tests:
+
+```bash
+./gradlew :catalogs-contrib:catalog-jdbc-hologres:test -PskipITs
+```
+
+### Test Coverage
+
+The integration tests cover:
+
+- **Schema operations**: create, load, list, drop schemas
+- **Table CRUD**: create, load, alter (rename, add/delete/rename columns, 
update comments), drop tables
+- **Type conversion**: bool, int2, int4, int8, float4, float8, text, varchar, 
date, timestamp, timestamptz, numeric, bytea
+- **Distribution**: hash distribution key
+- **Table properties**: orientation, time_to_live_in_seconds, etc.
+- **Primary key index**: single and composite primary keys
+- **Column default values**: integer, varchar, boolean literals
+- **Schema comments**: create schema with comment
diff --git 
a/catalogs-contrib/catalog-jdbc-hologres/src/test/java/org/apache/gravitino/catalog/hologres/integration/test/CatalogHologresIT.java
 
b/catalogs-contrib/catalog-jdbc-hologres/src/test/java/org/apache/gravitino/catalog/hologres/integration/test/CatalogHologresIT.java
new file mode 100644
index 0000000000..670d7aabe7
--- /dev/null
+++ 
b/catalogs-contrib/catalog-jdbc-hologres/src/test/java/org/apache/gravitino/catalog/hologres/integration/test/CatalogHologresIT.java
@@ -0,0 +1,952 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hologres.integration.test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.SupportsSchemas;
+import 
org.apache.gravitino.catalog.hologres.integration.test.service.HologresService;
+import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.integration.test.util.ITUtils;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.junit.jupiter.api.condition.EnabledIf;
+import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+import org.junit.platform.commons.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for the Hologres JDBC catalog.
+ *
+ * <p>Since Hologres is a cloud service hosted on Alibaba Cloud, these tests 
require a real Hologres
+ * instance. Set the following environment variables to enable these tests:
+ *
+ * <ul>
+ *   <li>{@code GRAVITINO_HOLOGRES_JDBC_URL} - Hologres JDBC URL (e.g.
+ *       jdbc:postgresql://host:port/db)
+ *   <li>{@code GRAVITINO_HOLOGRES_USERNAME} - Hologres username
+ *   <li>{@code GRAVITINO_HOLOGRES_PASSWORD} - Hologres password
+ * </ul>
+ */
+@EnabledIfEnvironmentVariable(named = "GRAVITINO_TEST_CLOUD_IT", matches = 
"true")
+@EnabledIf(value = "hologresIsConfigured", disabledReason = "Hologres is not 
configured")
+@TestInstance(Lifecycle.PER_CLASS)
+public class CatalogHologresIT extends BaseIT {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CatalogHologresIT.class);
+  private static final String PROVIDER = "jdbc-hologres";
+
+  public static final String HOLOGRES_JDBC_URL = 
System.getenv("GRAVITINO_HOLOGRES_JDBC_URL");
+  public static final String HOLOGRES_USERNAME = 
System.getenv("GRAVITINO_HOLOGRES_USERNAME");
+  public static final String HOLOGRES_PASSWORD = 
System.getenv("GRAVITINO_HOLOGRES_PASSWORD");
+
+  private final String metalakeName = 
GravitinoITUtils.genRandomName("hologres_it_metalake");
+  private final String catalogName = 
GravitinoITUtils.genRandomName("hologres_it_catalog");
+  private final String schemaName = 
GravitinoITUtils.genRandomName("hologres_it_schema");
+  private final String tableName = 
GravitinoITUtils.genRandomName("hologres_it_table");
+  private final String tableComment = "table_comment";
+
+  private final String HOLOGRES_COL_NAME1 = "hologres_col_name1";
+  private final String HOLOGRES_COL_NAME2 = "hologres_col_name2";
+  private final String HOLOGRES_COL_NAME3 = "hologres_col_name3";
+
+  private GravitinoMetalake metalake;
+  protected Catalog catalog;
+  private HologresService hologresService;
+
+  protected static boolean hologresIsConfigured() {
+    return StringUtils.isNotBlank(HOLOGRES_JDBC_URL)
+        && StringUtils.isNotBlank(HOLOGRES_USERNAME)
+        && StringUtils.isNotBlank(HOLOGRES_PASSWORD);
+  }
+
+  @BeforeAll
+  public void startup() throws IOException {
+    hologresService = new HologresService(HOLOGRES_JDBC_URL, 
HOLOGRES_USERNAME, HOLOGRES_PASSWORD);
+    createMetalake();
+    createCatalog();
+    createSchema();
+  }
+
+  @AfterAll
+  public void stop() {
+    try {
+      clearTableAndSchema();
+      metalake.dropCatalog(catalogName);
+      client.dropMetalake(metalakeName);
+      hologresService.close();
+    } catch (Exception e) {
+      LOG.error("Failed to stop.", e);
+    }
+  }
+
+  @AfterEach
+  public void resetSchema() {
+    clearTableAndSchema();
+    createSchema();
+  }
+
+  private void clearTableAndSchema() {
+    NameIdentifier[] nameIdentifiers =
+        catalog.asTableCatalog().listTables(Namespace.of(schemaName));
+    for (NameIdentifier nameIdentifier : nameIdentifiers) {
+      catalog.asTableCatalog().dropTable(nameIdentifier);
+    }
+    catalog.asSchemas().dropSchema(schemaName, true);
+  }
+
+  private void createMetalake() {
+    GravitinoMetalake[] gravitinoMetalakes = client.listMetalakes();
+    Assertions.assertEquals(0, gravitinoMetalakes.length);
+
+    client.createMetalake(metalakeName, "comment", Collections.emptyMap());
+    GravitinoMetalake loadMetalake = client.loadMetalake(metalakeName);
+    Assertions.assertEquals(metalakeName, loadMetalake.name());
+
+    metalake = loadMetalake;
+  }
+
+  private void createCatalog() {
+    Map<String, String> catalogProperties = Maps.newHashMap();
+    catalogProperties.put(JdbcConfig.JDBC_URL.getKey(), HOLOGRES_JDBC_URL);
+    catalogProperties.put(JdbcConfig.JDBC_DRIVER.getKey(), 
"org.postgresql.Driver");
+    catalogProperties.put(JdbcConfig.USERNAME.getKey(), HOLOGRES_USERNAME);
+    catalogProperties.put(JdbcConfig.PASSWORD.getKey(), HOLOGRES_PASSWORD);
+
+    // Extract database name from JDBC URL for jdbc-database config
+    String database = extractDatabaseFromUrl(HOLOGRES_JDBC_URL);
+    if (database != null) {
+      catalogProperties.put(JdbcConfig.JDBC_DATABASE.getKey(), database);
+    }
+
+    Catalog createdCatalog =
+        metalake.createCatalog(
+            catalogName, Catalog.Type.RELATIONAL, PROVIDER, "comment", 
catalogProperties);
+    Catalog loadCatalog = metalake.loadCatalog(catalogName);
+    Assertions.assertEquals(createdCatalog, loadCatalog);
+
+    catalog = loadCatalog;
+  }
+
+  private String extractDatabaseFromUrl(String url) {
+    // JDBC URL format: jdbc:postgresql://host:port/database
+    int lastSlash = url.lastIndexOf('/');
+    int questionMark = url.indexOf('?', lastSlash);
+    if (lastSlash >= 0) {
+      if (questionMark >= 0) {
+        return url.substring(lastSlash + 1, questionMark);
+      }
+      return url.substring(lastSlash + 1);
+    }
+    return null;
+  }
+
+  private void createSchema() {
+    Map<String, String> prop = Maps.newHashMap();
+    Schema createdSchema = catalog.asSchemas().createSchema(schemaName, null, 
prop);
+    Schema loadSchema = catalog.asSchemas().loadSchema(schemaName);
+    Assertions.assertEquals(createdSchema.name(), loadSchema.name());
+  }
+
+  private Column[] createColumns() {
+    Column col1 = Column.of(HOLOGRES_COL_NAME1, Types.IntegerType.get(), 
"col_1_comment");
+    Column col2 = Column.of(HOLOGRES_COL_NAME2, Types.DateType.get(), 
"col_2_comment");
+    Column col3 = Column.of(HOLOGRES_COL_NAME3, Types.StringType.get(), 
"col_3_comment");
+    return new Column[] {col1, col2, col3};
+  }
+
+  private Map<String, String> createProperties() {
+    return Maps.newHashMap();
+  }
+
+  @Test
+  void testOperationHologresSchema() {
+    SupportsSchemas schemas = catalog.asSchemas();
+    Namespace namespace = Namespace.of(metalakeName, catalogName);
+
+    // List schema check
+    String[] nameIdentifiers = schemas.listSchemas();
+    Set<String> schemaNames = Sets.newHashSet(nameIdentifiers);
+    Assertions.assertTrue(schemaNames.contains(schemaName));
+
+    NameIdentifier[] hologresSchemas = hologresService.listSchemas(namespace);
+    schemaNames =
+        
Arrays.stream(hologresSchemas).map(NameIdentifier::name).collect(Collectors.toSet());
+    Assertions.assertTrue(schemaNames.contains(schemaName));
+
+    // Create schema check
+    String testSchemaName = GravitinoITUtils.genRandomName("test_schema_1");
+    NameIdentifier schemaIdent = NameIdentifier.of(metalakeName, catalogName, 
testSchemaName);
+    schemas.createSchema(testSchemaName, null, Collections.emptyMap());
+    nameIdentifiers = schemas.listSchemas();
+    schemaNames = Sets.newHashSet(nameIdentifiers);
+    Assertions.assertTrue(schemaNames.contains(testSchemaName));
+
+    hologresSchemas = hologresService.listSchemas(namespace);
+    schemaNames =
+        
Arrays.stream(hologresSchemas).map(NameIdentifier::name).collect(Collectors.toSet());
+    Assertions.assertTrue(schemaNames.contains(testSchemaName));
+
+    Map<String, String> emptyMap = Collections.emptyMap();
+    Assertions.assertThrows(
+        SchemaAlreadyExistsException.class,
+        () -> schemas.createSchema(testSchemaName, null, emptyMap));
+
+    // Drop schema check
+    schemas.dropSchema(testSchemaName, false);
+    Assertions.assertThrows(NoSuchSchemaException.class, () -> 
schemas.loadSchema(testSchemaName));
+    Assertions.assertThrows(
+        NoSuchSchemaException.class, () -> 
hologresService.loadSchema(schemaIdent));
+
+    nameIdentifiers = schemas.listSchemas();
+    schemaNames = Sets.newHashSet(nameIdentifiers);
+    Assertions.assertFalse(schemaNames.contains(testSchemaName));
+    Assertions.assertFalse(schemas.dropSchema("no_exits", false));
+  }
+
+  @Test
+  void testCreateAndLoadHologresTable() {
+    Column[] columns = createColumns();
+
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+    Distribution distribution = Distributions.NONE;
+    SortOrder[] sortOrders = new SortOrder[0];
+    Transform[] partitioning = Transforms.EMPTY_TRANSFORM;
+    Map<String, String> properties = createProperties();
+
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier, columns, tableComment, properties, partitioning, 
distribution, sortOrders);
+
+    Table loadTable = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(tableName, loadTable.name());
+    Assertions.assertEquals(tableComment, loadTable.comment());
+    Assertions.assertEquals(columns.length, loadTable.columns().length);
+    for (int i = 0; i < columns.length; i++) {
+      ITUtils.assertColumn(columns[i], loadTable.columns()[i]);
+    }
+  }
+
+  @Test
+  void testColumnTypeConverter() {
+    Column[] columns =
+        new Column[] {
+          Column.of("bool_col", Types.BooleanType.get(), "bool column"),
+          Column.of("int2_col", Types.ShortType.get(), "int2 column"),
+          Column.of("int4_col", Types.IntegerType.get(), "int4 column"),
+          Column.of("int8_col", Types.LongType.get(), "int8 column"),
+          Column.of("float4_col", Types.FloatType.get(), "float4 column"),
+          Column.of("float8_col", Types.DoubleType.get(), "float8 column"),
+          Column.of("text_col", Types.StringType.get(), "text column"),
+          Column.of("varchar_col", Types.VarCharType.of(100), "varchar 
column"),
+          Column.of("date_col", Types.DateType.get(), "date column"),
+          Column.of("timestamp_col", Types.TimestampType.withoutTimeZone(), 
"timestamp column"),
+          Column.of("timestamptz_col", Types.TimestampType.withTimeZone(), 
"timestamptz column"),
+          Column.of("numeric_col", Types.DecimalType.of(10, 2), "numeric 
column"),
+          Column.of("bytea_col", Types.BinaryType.get(), "bytea column"),
+        };
+
+    String testTableName = 
GravitinoITUtils.genRandomName("type_converter_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, 
testTableName);
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        "type converter test",
+        ImmutableMap.of(),
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        new SortOrder[0]);
+
+    Table loadedTable = tableCatalog.loadTable(tableIdentifier);
+
+    for (Column column : loadedTable.columns()) {
+      switch (column.name()) {
+        case "bool_col":
+          Assertions.assertEquals(Types.BooleanType.get(), column.dataType());
+          break;
+        case "int2_col":
+          Assertions.assertEquals(Types.ShortType.get(), column.dataType());
+          break;
+        case "int4_col":
+          Assertions.assertEquals(Types.IntegerType.get(), column.dataType());
+          break;
+        case "int8_col":
+          Assertions.assertEquals(Types.LongType.get(), column.dataType());
+          break;
+        case "float4_col":
+          Assertions.assertEquals(Types.FloatType.get(), column.dataType());
+          break;
+        case "float8_col":
+          Assertions.assertEquals(Types.DoubleType.get(), column.dataType());
+          break;
+        case "text_col":
+          Assertions.assertEquals(Types.StringType.get(), column.dataType());
+          break;
+        case "varchar_col":
+          Assertions.assertEquals(Types.VarCharType.of(100), 
column.dataType());
+          break;
+        case "date_col":
+          Assertions.assertEquals(Types.DateType.get(), column.dataType());
+          break;
+        case "timestamp_col":
+          Assertions.assertTrue(column.dataType() instanceof 
Types.TimestampType);
+          Assertions.assertFalse(((Types.TimestampType) 
column.dataType()).hasTimeZone());
+          break;
+        case "timestamptz_col":
+          Assertions.assertTrue(column.dataType() instanceof 
Types.TimestampType);
+          Assertions.assertTrue(((Types.TimestampType) 
column.dataType()).hasTimeZone());
+          break;
+        case "numeric_col":
+          Assertions.assertEquals(Types.DecimalType.of(10, 2), 
column.dataType());
+          break;
+        case "bytea_col":
+          Assertions.assertEquals(Types.BinaryType.get(), column.dataType());
+          break;
+        default:
+          Assertions.fail("Unexpected column name: " + column.name());
+      }
+    }
+  }
+
+  @Test
+  void testAlterAndDropHologresTable() {
+    Column[] columns = createColumns();
+    String alterTableName = GravitinoITUtils.genRandomName("alter_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, 
alterTableName);
+
+    catalog
+        .asTableCatalog()
+        .createTable(tableIdentifier, columns, tableComment, 
createProperties());
+
+    // Test rename table
+    String newTableName = GravitinoITUtils.genRandomName("renamed_table");
+    catalog.asTableCatalog().alterTable(tableIdentifier, 
TableChange.rename(newTableName));
+
+    NameIdentifier newTableIdentifier = NameIdentifier.of(schemaName, 
newTableName);
+    Table table = catalog.asTableCatalog().loadTable(newTableIdentifier);
+    Assertions.assertEquals(newTableName, table.name());
+
+    // Test update table comment
+    catalog
+        .asTableCatalog()
+        .alterTable(newTableIdentifier, TableChange.updateComment(tableComment 
+ "_new"));
+    table = catalog.asTableCatalog().loadTable(newTableIdentifier);
+    Assertions.assertTrue(table.comment().contains(tableComment + "_new"));
+
+    // Test add column
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            newTableIdentifier,
+            TableChange.addColumn(new String[] {"col_4"}, 
Types.StringType.get()));
+    table = catalog.asTableCatalog().loadTable(newTableIdentifier);
+    Assertions.assertEquals(4, table.columns().length);
+    Assertions.assertEquals("col_4", table.columns()[3].name());
+    Assertions.assertEquals(Types.StringType.get(), 
table.columns()[3].dataType());
+
+    // Test rename column
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            newTableIdentifier,
+            TableChange.renameColumn(new String[] {HOLOGRES_COL_NAME2}, 
"col_2_new"));
+    table = catalog.asTableCatalog().loadTable(newTableIdentifier);
+    Assertions.assertEquals("col_2_new", table.columns()[1].name());
+
+    // Test update column comment
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            newTableIdentifier,
+            TableChange.updateColumnComment(new String[] {HOLOGRES_COL_NAME1}, 
"new_comment"));
+    table = catalog.asTableCatalog().loadTable(newTableIdentifier);
+    Assertions.assertEquals("new_comment", table.columns()[0].comment());
+
+    // Test drop table
+    
Assertions.assertTrue(catalog.asTableCatalog().dropTable(newTableIdentifier));
+    
Assertions.assertFalse(catalog.asTableCatalog().dropTable(newTableIdentifier));
+  }
+
+  @Test
+  void testDropColumn() {
+    // Setup: Create a table with multiple columns
+    Column col1 = Column.of("id", Types.LongType.get(), "id column", false, 
false, null);
+    Column col2 = Column.of("name", Types.StringType.get(), "name column", 
true, false, null);
+    Column col3 = Column.of("value", Types.IntegerType.get(), "value column", 
true, false, null);
+    Column[] columns = new Column[] {col1, col2, col3};
+
+    String dropTableName = GravitinoITUtils.genRandomName("drop_col_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, 
dropTableName);
+
+    catalog
+        .asTableCatalog()
+        .createTable(tableIdentifier, columns, tableComment, 
createProperties());
+
+    // Test 1: Delete an existing column
+    catalog
+        .asTableCatalog()
+        .alterTable(tableIdentifier, TableChange.deleteColumn(new String[] 
{"value"}, false));
+
+    Table table = catalog.asTableCatalog().loadTable(tableIdentifier);
+    Assertions.assertEquals(2, table.columns().length);
+    Assertions.assertFalse(Arrays.stream(table.columns()).anyMatch(c -> 
c.name().equals("value")));
+
+    // Test 2: Delete non-existent column with ifExists=true (should succeed 
without error)
+    Assertions.assertDoesNotThrow(
+        () ->
+            catalog
+                .asTableCatalog()
+                .alterTable(
+                    tableIdentifier,
+                    TableChange.deleteColumn(new String[] 
{"non_existent_column"}, true)));
+
+    // Test 3: Delete non-existent column with ifExists=false (should throw
+    // IllegalArgumentException)
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            catalog
+                .asTableCatalog()
+                .alterTable(
+                    tableIdentifier,
+                    TableChange.deleteColumn(new String[] 
{"non_existent_column"}, false)));
+
+    // Test 4: Delete nested column (should throw 
UnsupportedOperationException)
+    Assertions.assertThrows(
+        UnsupportedOperationException.class,
+        () ->
+            catalog
+                .asTableCatalog()
+                .alterTable(
+                    tableIdentifier,
+                    TableChange.deleteColumn(new String[] {"nested", 
"column"}, false)));
+  }
+
+  @Test
+  void testCreateTableWithDistribution() {
+    Column col1 = Column.of("id", Types.LongType.get(), "id column", false, 
false, null);
+    Column col2 = Column.of("name", Types.StringType.get(), "name column", 
true, false, null);
+    Column col3 = Column.of("value", Types.IntegerType.get(), "value column", 
true, false, null);
+    Column[] columns = new Column[] {col1, col2, col3};
+
+    String distTableName = GravitinoITUtils.genRandomName("dist_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, 
distTableName);
+
+    Distribution distribution = Distributions.hash(0, 
NamedReference.field("id"));
+
+    Index[] indexes = new Index[] {Indexes.primary("pk", new String[][] 
{{"id"}})};
+
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        "table with distribution",
+        ImmutableMap.of(),
+        Transforms.EMPTY_TRANSFORM,
+        distribution,
+        new SortOrder[0],
+        indexes);
+
+    Table loadedTable = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(distTableName, loadedTable.name());
+
+    // Verify distribution
+    Distribution loadedDist = loadedTable.distribution();
+    Assertions.assertNotNull(loadedDist);
+    Assertions.assertNotEquals(Distributions.NONE, loadedDist);
+  }
+
+  @Test
+  void testCreateTableWithProperties() {
+    Column col1 = Column.of("id", Types.LongType.get(), "id column", false, 
false, null);
+    Column col2 = Column.of("name", Types.StringType.get(), "name column", 
true, false, null);
+    Column[] columns = new Column[] {col1, col2};
+
+    String propsTableName = GravitinoITUtils.genRandomName("props_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, 
propsTableName);
+
+    // Note: time_to_live_in_seconds is only supported in warehouse Hologres 
instances,
+    // so we only test the orientation property which is supported in all 
instances.
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("orientation", "column");
+
+    Index[] indexes = new Index[] {Indexes.primary("pk", new String[][] 
{{"id"}})};
+
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        "table with properties",
+        properties,
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        new SortOrder[0],
+        indexes);
+
+    Table loadedTable = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(propsTableName, loadedTable.name());
+
+    Map<String, String> loadedProps = loadedTable.properties();
+    Assertions.assertEquals("column", loadedProps.get("orientation"));
+  }
+
+  @Test
+  void testCreateTableWithPrimaryKey() {
+    Column col1 = Column.of("id", Types.LongType.get(), "id column", false, 
false, null);
+    Column col2 = Column.of("name", Types.StringType.get(), "name column", 
true, false, null);
+    Column col3 = Column.of("value", Types.IntegerType.get(), "value column", 
true, false, null);
+    Column[] columns = new Column[] {col1, col2, col3};
+
+    String pkTableName = GravitinoITUtils.genRandomName("pk_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, 
pkTableName);
+
+    Index[] indexes =
+        new Index[] {Indexes.primary("pk_id_name", new String[][] {{"id"}, 
{"name"}})};
+
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        "table with primary key",
+        ImmutableMap.of(),
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        new SortOrder[0],
+        indexes);
+
+    Table loadedTable = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(pkTableName, loadedTable.name());
+
+    // Verify primary key index exists
+    Index[] loadedIndexes = loadedTable.index();
+    Assertions.assertTrue(loadedIndexes.length > 0);
+
+    boolean hasPrimaryKey = false;
+    for (Index index : loadedIndexes) {
+      if (index.type() == Index.IndexType.PRIMARY_KEY) {
+        hasPrimaryKey = true;
+        // Verify the primary key columns
+        Set<String> pkColumns =
+            
Arrays.stream(index.fieldNames()).flatMap(Arrays::stream).collect(Collectors.toSet());
+        Assertions.assertTrue(pkColumns.contains("id"));
+        Assertions.assertTrue(pkColumns.contains("name"));
+      }
+    }
+    Assertions.assertTrue(hasPrimaryKey, "Table should have a primary key");
+  }
+
+  @Test
+  void testColumnDefaultValue() {
+    Column col1 =
+        Column.of(
+            HOLOGRES_COL_NAME1,
+            Types.IntegerType.get(),
+            "col_1_comment",
+            false,
+            false,
+            Literals.integerLiteral(42));
+    Column col2 =
+        Column.of(
+            HOLOGRES_COL_NAME2,
+            Types.VarCharType.of(255),
+            "col_2_comment",
+            true,
+            false,
+            Literals.NULL);
+    Column col3 =
+        Column.of(
+            HOLOGRES_COL_NAME3,
+            Types.BooleanType.get(),
+            "col_3_comment",
+            false,
+            false,
+            Literals.booleanLiteral(true));
+
+    Column[] newColumns = new Column[] {col1, col2, col3};
+
+    String defaultValueTableName = 
GravitinoITUtils.genRandomName("default_value_table");
+    NameIdentifier tableIdent = NameIdentifier.of(schemaName, 
defaultValueTableName);
+
+    Index[] indexes = new Index[] {Indexes.primary("pk", new String[][] 
{{HOLOGRES_COL_NAME1}})};
+
+    catalog
+        .asTableCatalog()
+        .createTable(
+            tableIdent,
+            newColumns,
+            null,
+            ImmutableMap.of(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            new SortOrder[0],
+            indexes);
+
+    Table createdTable = catalog.asTableCatalog().loadTable(tableIdent);
+    Assertions.assertEquals(Literals.NULL, 
createdTable.columns()[1].defaultValue());
+  }
+
+  @Test
+  void testSchemaComment() {
+    String testSchemaName = 
GravitinoITUtils.genRandomName("schema_comment_test");
+
+    // Hologres supports schema comment via COMMENT ON SCHEMA
+    Schema schema = catalog.asSchemas().createSchema(testSchemaName, "test 
schema comment", null);
+    Assertions.assertNotNull(schema);
+
+    Schema loadedSchema = catalog.asSchemas().loadSchema(testSchemaName);
+    Assertions.assertEquals("test schema comment", loadedSchema.comment());
+
+    // Clean up
+    catalog.asSchemas().dropSchema(testSchemaName, true);
+  }
+
+  @Test
+  void testDropHologresSchema() {
+    String testSchemaName = 
GravitinoITUtils.genRandomName("hologres_drop_schema").toLowerCase();
+    String testTableName = 
GravitinoITUtils.genRandomName("hologres_drop_table").toLowerCase();
+
+    catalog
+        .asSchemas()
+        .createSchema(testSchemaName, null, ImmutableMap.<String, 
String>builder().build());
+
+    catalog
+        .asTableCatalog()
+        .createTable(
+            NameIdentifier.of(testSchemaName, testTableName),
+            createColumns(),
+            "Created by Gravitino client",
+            ImmutableMap.<String, String>builder().build());
+
+    // Try to drop a schema with cascade = true
+    catalog.asSchemas().dropSchema(testSchemaName, true);
+
+    // Check schema has been dropped
+    SupportsSchemas schemas = catalog.asSchemas();
+    Assertions.assertThrows(NoSuchSchemaException.class, () -> 
schemas.loadSchema(testSchemaName));
+  }
+
+  @Test
+  void testCreateTableWithMultipleProperties() {
+    // Test table with bitmap_columns, clustering_key, 
dictionary_encoding_columns, segment_key
+    Column[] columns =
+        new Column[] {
+          Column.of("l_orderkey", Types.LongType.get(), "order key", false, 
false, null),
+          Column.of("l_partkey", Types.IntegerType.get(), "part key", false, 
false, null),
+          Column.of("l_shipdate", Types.DateType.get(), "ship date", false, 
false, null),
+          Column.of("l_returnflag", Types.StringType.get(), "return flag", 
false, false, null),
+          Column.of("l_quantity", Types.DecimalType.of(15, 2), "quantity", 
false, false, null),
+        };
+
+    String tableName = GravitinoITUtils.genRandomName("multi_props_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("orientation", "column");
+    properties.put("bitmap_columns", "l_returnflag");
+    properties.put("clustering_key", "l_shipdate:asc");
+    properties.put("dictionary_encoding_columns", "l_returnflag:auto");
+    properties.put("segment_key", "l_shipdate");
+
+    Index[] indexes =
+        new Index[] {Indexes.primary("pk", new String[][] {{"l_orderkey"}, 
{"l_partkey"}})};
+    Distribution distribution = Distributions.hash(0, 
NamedReference.field("l_orderkey"));
+
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        "table with multiple properties",
+        properties,
+        Transforms.EMPTY_TRANSFORM,
+        distribution,
+        new SortOrder[0],
+        indexes);
+
+    Table loadedTable = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(tableName, loadedTable.name());
+
+    Map<String, String> loadedProps = loadedTable.properties();
+    Assertions.assertEquals("column", loadedProps.get("orientation"));
+    Assertions.assertNotNull(loadedProps.get("bitmap_columns"));
+    Assertions.assertNotNull(loadedProps.get("clustering_key"));
+  }
+
+  @Test
+  void testCreateTableWithBinlogProperties() {
+    // Test table with binlog properties
+    // Note: orientation = "column,row" is only supported in warehouse 
instances,
+    // so we use "column" orientation which is supported in all instances.
+    Column[] columns =
+        new Column[] {
+          Column.of("order_id", Types.LongType.get(), "order id", false, 
false, null),
+          Column.of("shop_id", Types.IntegerType.get(), "shop id", false, 
false, null),
+          Column.of("user_id", Types.StringType.get(), "user id", false, 
false, null),
+          Column.of("order_amount", Types.DecimalType.of(12, 2), "order 
amount", true, false, null),
+          Column.of(
+              "order_time", Types.TimestampType.withTimeZone(), "order time", 
false, false, null),
+        };
+
+    String tableName = GravitinoITUtils.genRandomName("binlog_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("orientation", "column");
+    properties.put("binlog_level", "replica");
+    properties.put("binlog_ttl", "86400");
+
+    Index[] indexes = new Index[] {Indexes.primary("pk", new String[][] 
{{"order_id"}})};
+    Distribution distribution = Distributions.hash(0, 
NamedReference.field("order_id"));
+
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        "table with binlog",
+        properties,
+        Transforms.EMPTY_TRANSFORM,
+        distribution,
+        new SortOrder[0],
+        indexes);
+
+    Table loadedTable = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(tableName, loadedTable.name());
+
+    Map<String, String> loadedProps = loadedTable.properties();
+    // Note: binlog properties may not be returned by some Hologres instance 
types
+    // (e.g., Serverless instances). We verify the table was created 
successfully,
+    // but don't strictly assert on binlog properties.
+    // If binlog properties exist, verify they were set correctly.
+    if (loadedProps.containsKey("binlog.level")) {
+      Assertions.assertEquals("replica", loadedProps.get("binlog.level"));
+    }
+  }
+
+  @Test
+  void testCreateTableWithoutPrimaryKeyAndDistribution() {
+    // Test table without primary key and distribution key
+    Column[] columns =
+        new Column[] {
+          Column.of("order_id", Types.LongType.get(), "order id", false, 
false, null),
+          Column.of("shop_id", Types.IntegerType.get(), "shop id", false, 
false, null),
+          Column.of("user_id", Types.StringType.get(), "user id", false, 
false, null),
+          Column.of("order_amount", Types.DecimalType.of(12, 2), "order 
amount", true, false, null),
+        };
+
+    String tableName = GravitinoITUtils.genRandomName("no_pk_dist_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("orientation", "column");
+    properties.put("clustering_key", "order_id:asc");
+
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        "table without pk and distribution",
+        properties,
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        new SortOrder[0],
+        Indexes.EMPTY_INDEXES);
+
+    Table loadedTable = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(tableName, loadedTable.name());
+
+    // Verify no primary key
+    Index[] loadedIndexes = loadedTable.index();
+    boolean hasPrimaryKey =
+        Arrays.stream(loadedIndexes).anyMatch(idx -> idx.type() == 
Index.IndexType.PRIMARY_KEY);
+    Assertions.assertFalse(hasPrimaryKey, "Table should not have a primary 
key");
+  }
+
+  @Test
+  void testCreatePhysicalPartitionTable() {
+    // Test physical partition table: PARTITION BY LIST
+    Column[] columns =
+        new Column[] {
+          Column.of("order_id", Types.LongType.get(), "order id", false, 
false, null),
+          Column.of("shop_id", Types.IntegerType.get(), "shop id", false, 
false, null),
+          Column.of("ds", Types.StringType.get(), "partition column", false, 
false, null),
+        };
+
+    String tableName = GravitinoITUtils.genRandomName("physical_pt_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("orientation", "column");
+
+    Index[] indexes = new Index[] {Indexes.primary("pk", new String[][] 
{{"order_id"}, {"ds"}})};
+    Distribution distribution = Distributions.hash(0, 
NamedReference.field("order_id"));
+
+    // Physical partition: PARTITION BY LIST(ds)
+    Transform[] partitioning = new Transform[] {Transforms.list(new String[][] 
{{"ds"}})};
+
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        "physical partition table",
+        properties,
+        partitioning,
+        distribution,
+        new SortOrder[0],
+        indexes);
+
+    Table loadedTable = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(tableName, loadedTable.name());
+
+    // Verify partitioning
+    Transform[] loadedPartitioning = loadedTable.partitioning();
+    Assertions.assertEquals(1, loadedPartitioning.length);
+    Assertions.assertTrue(loadedPartitioning[0] instanceof 
Transforms.ListTransform);
+  }
+
+  @Test
+  void testCreateLogicalPartitionTable() {
+    // Test logical partition table: LOGICAL PARTITION BY LIST
+    Column[] columns =
+        new Column[] {
+          Column.of("order_id", Types.LongType.get(), "order id", false, 
false, null),
+          Column.of("shop_id", Types.IntegerType.get(), "shop id", false, 
false, null),
+          Column.of("ds", Types.DateType.get(), "partition column", false, 
false, null),
+        };
+
+    String tableName = GravitinoITUtils.genRandomName("logical_pt_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("orientation", "column");
+    properties.put("is_logical_partitioned_table", "true");
+    properties.put("partition_expiration_time", "30 day");
+
+    Index[] indexes = new Index[] {Indexes.primary("pk", new String[][] 
{{"order_id"}, {"ds"}})};
+    Distribution distribution = Distributions.hash(0, 
NamedReference.field("order_id"));
+
+    // Logical partition: LOGICAL PARTITION BY LIST(ds)
+    Transform[] partitioning = new Transform[] {Transforms.list(new String[][] 
{{"ds"}})};
+
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        "logical partition table",
+        properties,
+        partitioning,
+        distribution,
+        new SortOrder[0],
+        indexes);
+
+    Table loadedTable = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(tableName, loadedTable.name());
+
+    // Verify partitioning
+    Transform[] loadedPartitioning = loadedTable.partitioning();
+    Assertions.assertEquals(1, loadedPartitioning.length);
+    Assertions.assertTrue(loadedPartitioning[0] instanceof 
Transforms.ListTransform);
+
+    // Verify logical partition property
+    Map<String, String> loadedProps = loadedTable.properties();
+    Assertions.assertEquals("true", 
loadedProps.get("is_logical_partitioned_table"));
+  }
+
+  @Test
+  void testCreateTwoLevelLogicalPartitionTable() {
+    // Test two-level logical partition table: LOGICAL PARTITION BY LIST(yy, 
mm)
+    Column[] columns =
+        new Column[] {
+          Column.of("order_id", Types.LongType.get(), "order id", false, 
false, null),
+          Column.of("yy", Types.StringType.get(), "year partition", false, 
false, null),
+          Column.of("mm", Types.StringType.get(), "month partition", false, 
false, null),
+        };
+
+    String tableName = GravitinoITUtils.genRandomName("two_level_pt_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("orientation", "column");
+    properties.put("is_logical_partitioned_table", "true");
+
+    Distribution distribution = Distributions.hash(0, 
NamedReference.field("order_id"));
+
+    // Two-level logical partition: LOGICAL PARTITION BY LIST(yy, mm)
+    Transform[] partitioning = new Transform[] {Transforms.list(new String[][] 
{{"yy"}, {"mm"}})};
+
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        "two-level logical partition table",
+        properties,
+        partitioning,
+        distribution,
+        new SortOrder[0],
+        Indexes.EMPTY_INDEXES);
+
+    Table loadedTable = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(tableName, loadedTable.name());
+
+    // Verify partitioning
+    Transform[] loadedPartitioning = loadedTable.partitioning();
+    Assertions.assertEquals(1, loadedPartitioning.length);
+    Assertions.assertTrue(loadedPartitioning[0] instanceof 
Transforms.ListTransform);
+
+    // Verify logical partition property
+    Map<String, String> loadedProps = loadedTable.properties();
+    Assertions.assertEquals("true", 
loadedProps.get("is_logical_partitioned_table"));
+  }
+}
diff --git 
a/catalogs-contrib/catalog-jdbc-hologres/src/test/java/org/apache/gravitino/catalog/hologres/integration/test/service/HologresService.java
 
b/catalogs-contrib/catalog-jdbc-hologres/src/test/java/org/apache/gravitino/catalog/hologres/integration/test/service/HologresService.java
new file mode 100644
index 0000000000..889f2d681d
--- /dev/null
+++ 
b/catalogs-contrib/catalog-jdbc-hologres/src/test/java/org/apache/gravitino/catalog/hologres/integration/test/service/HologresService.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hologres.integration.test.service;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.catalog.jdbc.JdbcSchema;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.meta.AuditInfo;
+
+/**
+ * A helper service that directly connects to Hologres via JDBC for 
verification in integration
+ * tests.
+ */
+public class HologresService {
+
+  private final Connection connection;
+
+  public HologresService(String jdbcUrl, String username, String password) {
+    try {
+      connection = DriverManager.getConnection(jdbcUrl, username, password);
+    } catch (SQLException e) {
+      throw new RuntimeException("Failed to connect to Hologres: " + 
e.getMessage(), e);
+    }
+  }
+
+  public NameIdentifier[] listSchemas(Namespace namespace) {
+    List<String> schemas = new ArrayList<>();
+    try (ResultSet resultSet = 
connection.getMetaData().getSchemas(connection.getCatalog(), null)) {
+      while (resultSet.next()) {
+        schemas.add(resultSet.getString("TABLE_SCHEM"));
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return schemas.stream()
+        .map(s -> 
NameIdentifier.of(org.apache.commons.lang3.ArrayUtils.add(namespace.levels(), 
s)))
+        .toArray(NameIdentifier[]::new);
+  }
+
+  public JdbcSchema loadSchema(NameIdentifier schemaIdent) {
+    String schemaName = schemaIdent.name();
+    String query = "SELECT nspname FROM pg_catalog.pg_namespace WHERE nspname 
= ?";
+    try (PreparedStatement preparedStatement = 
connection.prepareStatement(query)) {
+      preparedStatement.setString(1, schemaName);
+      try (ResultSet resultSet = preparedStatement.executeQuery()) {
+        if (!resultSet.next()) {
+          throw new NoSuchSchemaException("Schema %s could not be found", 
schemaName);
+        }
+        return 
JdbcSchema.builder().withName(schemaName).withAuditInfo(AuditInfo.EMPTY).build();
+      }
+    } catch (final SQLException se) {
+      throw new RuntimeException(se);
+    }
+  }
+
+  public void executeQuery(String sql) {
+    try (Statement statement = connection.createStatement()) {
+      statement.execute(sql);
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void close() {
+    try {
+      connection.close();
+    } catch (SQLException e) {
+      // ignore
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java 
b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
index dc06c0702f..7ccbd3cb3d 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
@@ -117,7 +117,7 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
   private static final Logger LOG = 
LoggerFactory.getLogger(CatalogManager.class);
 
   private static final Set<String> CONTRIB_CATALOGS_TYPES =
-      ImmutableSet.of("jdbc-oceanbase", "jdbc-clickhouse");
+      ImmutableSet.of("jdbc-oceanbase", "jdbc-clickhouse", "jdbc-hologres");
 
   /** Wrapper class for a catalog instance and its class loader. */
   public static class CatalogWrapper {

Reply via email to