This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit 925111bdbfb7ec3cc3a54b3752c9300a56ad43b2
Author: Yuhui <[email protected]>
AuthorDate: Thu Jan 29 19:12:17 2026 +0800

    [#9718] feat (trino-connector): Split Trino connector to common layer and 
adapter for Trino SIP changes (#9735)
    
    Split Trino connector to common layer and adapter for Trino SIP changes
    
    Fix: #9718
    
    NO
    
    UTs
---
 .github/workflows/frontend-integration-test.yml    |  19 +-
 .github/workflows/trino-integration-test.yml       |   7 +-
 trino-connector/trino-connector/build.gradle.kts   |  34 +---
 .../trino/connector/GravitinoConnector.java        |  37 ++--
 .../trino/connector/GravitinoConnectorFactory.java | 109 ++++++++++--
 .../trino/connector/GravitinoMetadata.java         |  54 +-----
 .../GravitinoNodePartitioningProvider.java         |  13 --
 .../gravitino/trino/connector/GravitinoPlugin.java |  28 ++-
 .../gravitino/trino/connector/GravitinoSplit.java  |   7 +-
 .../trino/connector/GravitinoSplitManager.java     |   9 +-
 .../trino/connector/GravitinoSplitSource.java      |   9 +-
 .../connector/catalog/CatalogConnectorContext.java |  11 +-
 .../connector/catalog/CatalogConnectorManager.java |  61 ++++---
 .../catalog/CatalogConnectorMetadata.java          |  21 +++
 .../trino/connector/catalog/CatalogRegister.java   |  67 ++-----
 .../connector/system/GravitinoSystemConnector.java |  50 ++++--
 .../trino/connector/util/json/BlockJsonSerde.java  |  13 +-
 .../connector/AbstractGravitinoConnectorTest.java  |  79 +++++++++
 .../trino/connector/GravitinoMockServer.java       |  56 +++++-
 .../connector/TestCreateGravitinoConnector.java    | 104 -----------
 .../trino/connector/TestGravitinoConfig.java       |   8 -
 .../trino/connector/TestGravitinoConnector.java    | 112 +++++-------
 .../connector/TestGravitinoConnectorFactory.java   |  35 ----
 .../TestGravitinoConnectorNullChecks.java          |  31 +++-
 ...tGravitinoConnectorWithMetalakeCatalogName.java | 123 +++++--------
 .../TestGravitinoConnectorWithSkipCatalog.java     | 128 -------------
 .../TestGravitinoMetadataGetSystemTable.java       |  13 +-
 .../trino/connector/TestGravitinoPlugin.java       |  45 -----
 .../catalog/TestCatalogConnectorManager.java       | 197 +++++++++++++++++++++
 .../catalog/hive/TestHiveDataTypeConverter.java    |   5 +-
 .../TestPostgreSQLDataTypeTransformer.java         |  14 +-
 .../system/TestGravitinoSystemConnector.java       |  25 ++-
 32 files changed, 803 insertions(+), 721 deletions(-)

diff --git a/.github/workflows/frontend-integration-test.yml 
b/.github/workflows/frontend-integration-test.yml
index d9f4232bc1..71f788371c 100644
--- a/.github/workflows/frontend-integration-test.yml
+++ b/.github/workflows/frontend-integration-test.yml
@@ -1,11 +1,8 @@
 name: Frontend Integration Test
 
 # Controls when the workflow will run
-on:
-  push:
-    branches: [ "main", "branch-*" ]
-  pull_request:
-    branches: [ "main", "branch-*" ]
+# Disable it temporarily.
+on: {}
 
 concurrency:
   group: ${{ github.workflow }}-${{ github.event.pull_request.number || 
github.ref }}
@@ -37,7 +34,7 @@ jobs:
               - scripts/**
               - server/**
               - server-common/**
-              - web-v2/**
+              - web/**
               - build.gradle.kts
               - gradle.properties
               - gradlew
@@ -49,7 +46,7 @@ jobs:
     needs: changes
     if: needs.changes.outputs.source_changes == 'true'
     runs-on: ubuntu-latest
-    timeout-minutes: 90
+    timeout-minutes: 60
     strategy:
       matrix:
         # Integration test for AMD64 architecture
@@ -75,7 +72,7 @@ jobs:
 
       - name: Package Gravitino
         run: |
-          GRAVITINO_USE_WEB_V2=true ./gradlew compileDistribution -x test 
+          ./gradlew compileDistribution -x test 
 
       - name: Free up disk space
         run: |
@@ -84,8 +81,8 @@ jobs:
       - name: Frontend Integration Test
         id: integrationTest
         run: |
-          GRAVITINO_USE_WEB_V2=true ./gradlew -PskipTests -PtestMode=embedded 
-PskipDockerTests=false :web-v2:integration-test:test
-          GRAVITINO_USE_WEB_V2=true ./gradlew -PskipTests -PtestMode=deploy 
-PskipDockerTests=false :web-v2:integration-test:test
+          ./gradlew -PskipTests -PtestMode=embedded -PskipDockerTests=false 
:web:integration-test:test
+          ./gradlew -PskipTests -PtestMode=deploy -PskipDockerTests=false 
:web:integration-test:test
 
       - name: Upload integrate tests reports
         uses: actions/upload-artifact@v4
@@ -99,4 +96,4 @@ jobs:
             distribution/package/logs/gravitino-server.log
             catalogs/**/*.log
             catalogs/**/*.tar
-            web-v2/integration-test/build/*.log
\ No newline at end of file
+            web/integration-test/build/*.log
diff --git a/.github/workflows/trino-integration-test.yml 
b/.github/workflows/trino-integration-test.yml
index 6f80fc7bad..a8097e0337 100644
--- a/.github/workflows/trino-integration-test.yml
+++ b/.github/workflows/trino-integration-test.yml
@@ -1,11 +1,8 @@
 name: Trino Integration Test
 
 # Controls when the workflow will run
-on:
-  push:
-    branches: [ "main", "branch-*" ]
-  pull_request:
-    branches: [ "main", "branch-*" ]
+# Disable it temporarily.
+on: {}
 
 concurrency:
   group: ${{ github.workflow }}-${{ github.event.pull_request.number || 
github.ref }}
diff --git a/trino-connector/trino-connector/build.gradle.kts 
b/trino-connector/trino-connector/build.gradle.kts
index c9bd1d8d6c..39c7a078c0 100644
--- a/trino-connector/trino-connector/build.gradle.kts
+++ b/trino-connector/trino-connector/build.gradle.kts
@@ -16,8 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 plugins {
-  `maven-publish`
   id("java")
   id("idea")
 }
@@ -26,6 +26,10 @@ repositories {
   mavenCentral()
 }
 
+val trinoVersionProvider =
+  providers.gradleProperty("trinoVersion").map { it.toInt() }.orElse(435)
+val trinoVersion = trinoVersionProvider.get()
+
 dependencies {
   implementation(project(":catalogs:catalog-common"))
   implementation(project(":clients:client-java-runtime", configuration = 
"shadow"))
@@ -33,40 +37,20 @@ dependencies {
   implementation(libs.bundles.log4j)
   implementation(libs.commons.collections4)
   implementation(libs.commons.lang3)
-  implementation(libs.trino.jdbc)
+  implementation("io.trino:trino-jdbc:$trinoVersion")
   compileOnly(libs.airlift.resolver)
-  compileOnly(libs.trino.spi) {
+  compileOnly("io.trino:trino-spi:$trinoVersion") {
     exclude("org.apache.logging.log4j")
   }
   testImplementation(libs.awaitility)
   testImplementation(libs.mockito.core)
   testImplementation(libs.mysql.driver)
-  testImplementation(libs.trino.memory) {
+  testImplementation("io.trino:trino-memory:$trinoVersion") {
     exclude("org.antlr")
     exclude("org.apache.logging.log4j")
   }
-  testImplementation(libs.trino.testing) {
+  testImplementation("io.trino:trino-testing:$trinoVersion") {
     exclude("org.apache.logging.log4j")
   }
   testRuntimeOnly(libs.junit.jupiter.engine)
 }
-
-tasks.named("generateMetadataFileForMavenJavaPublication") {
-  dependsOn(":trino-connector:trino-connector:copyDepends")
-}
-
-tasks {
-  val copyDepends by registering(Copy::class) {
-    from(configurations.runtimeClasspath)
-    into("build/libs")
-  }
-  jar {
-    finalizedBy(copyDepends)
-  }
-
-  register("copyLibs", Copy::class) {
-    dependsOn(copyDepends, "build")
-    from("build/libs")
-    into("$rootDir/distribution/${rootProject.name}-trino-connector")
-  }
-}
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java
index c9b48b50d0..6247be1d4f 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java
@@ -18,7 +18,10 @@
  */
 package org.apache.gravitino.trino.connector;
 
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+
 import com.google.common.base.Preconditions;
+import io.trino.spi.TrinoException;
 import io.trino.spi.connector.Connector;
 import io.trino.spi.connector.ConnectorAccessControl;
 import io.trino.spi.connector.ConnectorCapabilities;
@@ -35,9 +38,9 @@ import io.trino.spi.transaction.IsolationLevel;
 import java.util.List;
 import java.util.Set;
 import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.client.GravitinoMetalake;
 import org.apache.gravitino.trino.connector.catalog.CatalogConnectorContext;
 import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata;
+import 
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
 
 /**
  * GravitinoConnector serves as the entry point for operations on the 
connector managed by Trino and
@@ -47,19 +50,20 @@ import 
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata;
 public class GravitinoConnector implements Connector {
 
   private final NameIdentifier catalogIdentifier;
-  private final CatalogConnectorContext catalogConnectorContext;
+  protected final CatalogConnectorContext catalogConnectorContext;
+  private final CatalogConnectorMetadata connectorMetadata;
 
   /**
    * Constructs a new GravitinoConnector with the specified catalog identifier 
and catalog connector
    * context.
    *
-   * @param catalogIdentifier the catalog identifier
    * @param catalogConnectorContext the catalog connector context
    */
-  public GravitinoConnector(
-      NameIdentifier catalogIdentifier, CatalogConnectorContext 
catalogConnectorContext) {
-    this.catalogIdentifier = catalogIdentifier;
+  public GravitinoConnector(CatalogConnectorContext catalogConnectorContext) {
+    this.catalogIdentifier = 
catalogConnectorContext.getCatalog().geNameIdentifier();
     this.catalogConnectorContext = catalogConnectorContext;
+    this.connectorMetadata =
+        new CatalogConnectorMetadata(catalogConnectorContext.getMetalake(), 
this.catalogIdentifier);
   }
 
   @Override
@@ -86,14 +90,15 @@ public class GravitinoConnector implements Connector {
     ConnectorMetadata internalMetadata =
         internalConnector.getMetadata(session, 
gravitinoTransactionHandle.getInternalHandle());
     Preconditions.checkArgument(internalMetadata != null, "Internal metadata 
must not be null");
+    return createGravitinoMetadata(
+        connectorMetadata, catalogConnectorContext.getMetadataAdapter(), 
internalMetadata);
+  }
 
-    GravitinoMetalake metalake = catalogConnectorContext.getMetalake();
-
-    CatalogConnectorMetadata catalogConnectorMetadata =
-        new CatalogConnectorMetadata(metalake, catalogIdentifier);
-
-    return new GravitinoMetadata(
-        catalogConnectorMetadata, 
catalogConnectorContext.getMetadataAdapter(), internalMetadata);
+  protected GravitinoMetadata createGravitinoMetadata(
+      CatalogConnectorMetadata catalogConnectorMetadata,
+      CatalogConnectorMetadataAdapter metadataAdapter,
+      ConnectorMetadata internalMetadata) {
+    throw new TrinoException(NOT_SUPPORTED, "Should be overridden in 
subclass");
   }
 
   @Override
@@ -172,4 +177,10 @@ public class GravitinoConnector implements Connector {
         internalConnector.getNodePartitioningProvider();
     return new GravitinoNodePartitioningProvider(nodePartitioningProvider);
   }
+
+  public void shutdown() {
+    Connector internalConnector = 
catalogConnectorContext.getInternalConnector();
+    internalConnector.shutdown();
+    catalogConnectorContext.close();
+  }
 }
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnectorFactory.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnectorFactory.java
index 2bbcf61e4e..c9fd7e9392 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnectorFactory.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnectorFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.gravitino.trino.connector;
 
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
 import static 
org.apache.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_RUNTIME_ERROR;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -28,9 +29,9 @@ import io.trino.spi.connector.Connector;
 import io.trino.spi.connector.ConnectorContext;
 import io.trino.spi.connector.ConnectorFactory;
 import java.util.Map;
-import java.util.function.Supplier;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.trino.connector.catalog.CatalogConnectorContext;
 import org.apache.gravitino.trino.connector.catalog.CatalogConnectorFactory;
 import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager;
 import org.apache.gravitino.trino.connector.catalog.CatalogRegister;
@@ -45,6 +46,8 @@ import org.slf4j.LoggerFactory;
 public class GravitinoConnectorFactory implements ConnectorFactory {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(GravitinoConnectorFactory.class);
+  private static final int MIN_SUPPORT_TRINO_SPI_VERSION = 435;
+  private static final int MAX_SUPPORT_TRINO_SPI_VERSION = Integer.MAX_VALUE;
   /** The default connector name. */
   public static final String DEFAULT_CONNECTOR_NAME = "gravitino";
 
@@ -53,6 +56,13 @@ public class GravitinoConnectorFactory implements 
ConnectorFactory {
 
   private CatalogConnectorManager catalogConnectorManager;
 
+  private GravitinoAdminClient client;
+  private int trinoVersion;
+
+  public GravitinoConnectorFactory(GravitinoAdminClient client) {
+    this.client = client;
+  }
+
   @Override
   public String getName() {
     return DEFAULT_CONNECTOR_NAME;
@@ -74,25 +84,31 @@ public class GravitinoConnectorFactory implements 
ConnectorFactory {
    *
    * @param catalogName the connector name of catalog
    * @param requiredConfig the config of connector
-   * @param context Trino connector context
    * @return Trino connector
    */
   @Override
   public Connector create(
-      String catalogName, Map<String, String> requiredConfig, ConnectorContext 
context) {
+      String catalogName,
+      Map<String, String> requiredConfig,
+      ConnectorContext trinoConnectorContext) {
     Preconditions.checkArgument(requiredConfig != null, "requiredConfig is not 
null");
     GravitinoConfig config = new GravitinoConfig(requiredConfig);
 
     synchronized (this) {
       if (catalogConnectorManager == null) {
+        checkTrinoSpiVersion(trinoConnectorContext, config);
         try {
           CatalogRegister catalogRegister = new CatalogRegister();
 
           CatalogConnectorFactory catalogConnectorFactory = 
createCatalogConnectorFactory(config);
           catalogConnectorManager =
-              new CatalogConnectorManager(catalogRegister, 
catalogConnectorFactory);
-          catalogConnectorManager.config(config, clientProvider().get());
-          catalogConnectorManager.start(context);
+              new CatalogConnectorManager(
+                  catalogRegister, catalogConnectorFactory, 
this::getTrinoCatalogName);
+          catalogConnectorManager.config(config, client);
+
+          if (isCoordinator(trinoConnectorContext)) {
+            catalogConnectorManager.start();
+          }
 
           gravitinoSystemTableFactory = new 
GravitinoSystemTableFactory(catalogConnectorManager);
         } catch (Exception e) {
@@ -106,7 +122,12 @@ public class GravitinoConnectorFactory implements 
ConnectorFactory {
     if (config.isDynamicConnector()) {
       // The dynamic connector is an instance of GravitinoConnector. It is 
loaded from Gravitino
       // server.
-      return catalogConnectorManager.createConnector(catalogName, config, 
context);
+      CatalogConnectorContext catalogConnectorContext =
+          catalogConnectorManager.createCatalogConnectorContext(
+              catalogName, config, trinoConnectorContext);
+      GravitinoConnector catalogConnector = 
createConnector(catalogConnectorContext);
+      catalogConnectorContext.bindConnector(catalogConnector);
+      return catalogConnectorContext.getConnector();
     } else {
       // The static connector is an instance of GravitinoSystemConnector. It 
is loaded by Trino
       // using the connector configuration.
@@ -117,13 +138,75 @@ public class GravitinoConnectorFactory implements 
ConnectorFactory {
       }
       GravitinoStoredProcedureFactory gravitinoStoredProcedureFactory =
           new GravitinoStoredProcedureFactory(catalogConnectorManager, 
metalake);
-      return new GravitinoSystemConnector(gravitinoStoredProcedureFactory);
+      return createSystemConnector(gravitinoStoredProcedureFactory);
     }
   }
 
-  @VisibleForTesting
-  Supplier<GravitinoAdminClient> clientProvider() {
-    return () -> null;
+  protected GravitinoConnector createConnector(CatalogConnectorContext 
connectorContext) {
+    throw new TrinoException(NOT_SUPPORTED, "Should be overridden in 
subclass");
+  }
+
+  protected GravitinoSystemConnector createSystemConnector(
+      GravitinoStoredProcedureFactory storedProcedureFactory) {
+    return new GravitinoSystemConnector(storedProcedureFactory);
+  }
+
+  protected String getTrinoCatalogName(String metalakeName, String 
catalogName) {
+    return "\"" + metalakeName + "." + catalogName + "\"";
+  }
+
+  private void checkTrinoSpiVersion(ConnectorContext context, GravitinoConfig 
config) {
+    String spiVersion = context.getSpiVersion();
+    trinoVersion = Integer.parseInt(spiVersion);
+
+    // check catalog name with metalake are supported in this trino version
+    if (!config.singleMetalakeMode() && !supportCatalogNameWithMetalake()) {
+      String errmsg =
+          String.format(
+              "The trino-connector-%s-%s does not support catalog name with 
metalake.",
+              getMinSupportTrinoSpiVersion(), getMaxSupportTrinoSpiVersion());
+      throw new 
TrinoException(GravitinoErrorCode.GRAVITINO_UNSUPPORTED_TRINO_VERSION, errmsg);
+    }
+
+    // skip version validation
+    boolean spiVersionCheck = config.isSkipTrinoVersionValidation();
+    if (spiVersionCheck) {
+      if (trinoVersion < getMinSupportTrinoSpiVersion()
+          || trinoVersion > getMaxSupportTrinoSpiVersion()) {
+        LOG.warn(
+            "The version {} has not undergone thorough testing with Gravitino, 
there may be compatibility problem.",
+            trinoVersion);
+      }
+      return;
+    }
+
+    // version validation
+    if (trinoVersion < getMinSupportTrinoSpiVersion()
+        || trinoVersion > getMaxSupportTrinoSpiVersion()) {
+      String errmsg =
+          String.format(
+              "Unsupported Trino-%s version. The Supported version for the 
Gravitino-Trino-connector from Trino-%d to Trino-%d."
+                  + "Maybe you can set gravitino.trino.skip-version-validation 
to skip version validation.",
+              trinoVersion, getMinSupportTrinoSpiVersion(), 
getMaxSupportTrinoSpiVersion());
+      throw new 
TrinoException(GravitinoErrorCode.GRAVITINO_UNSUPPORTED_TRINO_VERSION, errmsg);
+    }
+  }
+
+  protected boolean supportCatalogNameWithMetalake() {
+    return true;
+  }
+
+  protected int getMinSupportTrinoSpiVersion() {
+    return MIN_SUPPORT_TRINO_SPI_VERSION;
+  }
+
+  protected int getMaxSupportTrinoSpiVersion() {
+    return MAX_SUPPORT_TRINO_SPI_VERSION;
+  }
+
+  @SuppressWarnings("deprecation")
+  protected boolean isCoordinator(ConnectorContext connectorContext) {
+    return connectorContext.getNodeManager().getCurrentNode().isCoordinator();
   }
 
   private CatalogConnectorFactory 
createCatalogConnectorFactory(GravitinoConfig config) {
@@ -145,4 +228,8 @@ public class GravitinoConnectorFactory implements 
ConnectorFactory {
           GRAVITINO_RUNTIME_ERROR, "Can not create CatalogConnectorFactory ", 
e);
     }
   }
+
+  public int getTrinoVersion() {
+    return trinoVersion;
+  }
 }
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
index eadb9d5bde..5bd0b0e607 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
@@ -23,7 +23,6 @@ import static 
org.apache.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-import io.airlift.slice.Slice;
 import io.trino.spi.TrinoException;
 import io.trino.spi.connector.AggregateFunction;
 import io.trino.spi.connector.AggregationApplicationResult;
@@ -31,9 +30,7 @@ import io.trino.spi.connector.Assignment;
 import io.trino.spi.connector.ColumnHandle;
 import io.trino.spi.connector.ColumnMetadata;
 import io.trino.spi.connector.ConnectorInsertTableHandle;
-import io.trino.spi.connector.ConnectorMergeTableHandle;
 import io.trino.spi.connector.ConnectorMetadata;
-import io.trino.spi.connector.ConnectorOutputMetadata;
 import io.trino.spi.connector.ConnectorPartitioningHandle;
 import io.trino.spi.connector.ConnectorSession;
 import io.trino.spi.connector.ConnectorTableHandle;
@@ -58,10 +55,8 @@ import io.trino.spi.expression.ConnectorExpression;
 import io.trino.spi.expression.Constant;
 import io.trino.spi.security.TrinoPrincipal;
 import io.trino.spi.statistics.ColumnStatistics;
-import io.trino.spi.statistics.ComputedStatistics;
 import io.trino.spi.statistics.TableStatistics;
 import io.trino.spi.type.Type;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -71,7 +66,6 @@ import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata;
 import 
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
-import org.apache.gravitino.trino.connector.metadata.GravitinoColumn;
 import org.apache.gravitino.trino.connector.metadata.GravitinoSchema;
 import org.apache.gravitino.trino.connector.metadata.GravitinoTable;
 
@@ -80,18 +74,18 @@ import 
org.apache.gravitino.trino.connector.metadata.GravitinoTable;
  * server. It also transforms the different metadata formats between Trino and 
Gravitino.
  * Additionally, it wraps the internal connector metadata for accessing data.
  */
-public class GravitinoMetadata implements ConnectorMetadata {
+public abstract class GravitinoMetadata implements ConnectorMetadata {
 
   // The column handle name that will generate row IDs for the merge operation.
   public static final String MERGE_ROW_ID = "$row_id";
 
   // Handling metadata operations on gravitino server
-  private final CatalogConnectorMetadata catalogConnectorMetadata;
+  protected final CatalogConnectorMetadata catalogConnectorMetadata;
 
   // Transform different metadata format
-  private final CatalogConnectorMetadataAdapter metadataAdapter;
+  protected final CatalogConnectorMetadataAdapter metadataAdapter;
 
-  private final ConnectorMetadata internalMetadata;
+  protected final ConnectorMetadata internalMetadata;
 
   /**
    * Constructs a new GravitinoMetadata instance.
@@ -253,16 +247,6 @@ public class GravitinoMetadata implements 
ConnectorMetadata {
     return new GravitinoInsertTableHandle(insertTableHandle);
   }
 
-  @Override
-  public Optional<ConnectorOutputMetadata> finishInsert(
-      ConnectorSession session,
-      ConnectorInsertTableHandle insertHandle,
-      Collection<Slice> fragments,
-      Collection<ComputedStatistics> computedStatistics) {
-    return internalMetadata.finishInsert(
-        session, GravitinoHandle.unWrap(insertHandle), fragments, 
computedStatistics);
-  }
-
   @Override
   public void renameSchema(ConnectorSession session, String source, String 
target) {
     catalogConnectorMetadata.renameSchema(source, target);
@@ -293,13 +277,6 @@ public class GravitinoMetadata implements 
ConnectorMetadata {
     catalogConnectorMetadata.setTableProperties(getTableName(tableHandle), 
allProps);
   }
 
-  @Override
-  public void addColumn(
-      ConnectorSession session, ConnectorTableHandle tableHandle, 
ColumnMetadata column) {
-    GravitinoColumn gravitinoColumn = metadataAdapter.createColumn(column);
-    catalogConnectorMetadata.addColumn(getTableName(tableHandle), 
gravitinoColumn);
-  }
-
   @Override
   public void dropColumn(
       ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle 
column) {
@@ -616,27 +593,6 @@ public class GravitinoMetadata implements 
ConnectorMetadata {
     return updateLayout.map(GravitinoPartitioningHandle::new);
   }
 
-  @Override
-  public ConnectorMergeTableHandle beginMerge(
-      ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode 
retryMode) {
-    ConnectorMergeTableHandle connectorMergeTableHandle =
-        internalMetadata.beginMerge(session, 
GravitinoHandle.unWrap(tableHandle), retryMode);
-    SchemaTableName tableName = getTableName(tableHandle);
-
-    return new GravitinoMergeTableHandle(
-        tableName.getSchemaName(), tableName.getTableName(), 
connectorMergeTableHandle);
-  }
-
-  @Override
-  public void finishMerge(
-      ConnectorSession session,
-      ConnectorMergeTableHandle mergeTableHandle,
-      Collection<Slice> fragments,
-      Collection<ComputedStatistics> computedStatistics) {
-    internalMetadata.finishMerge(
-        session, GravitinoHandle.unWrap(mergeTableHandle), fragments, 
computedStatistics);
-  }
-
   @Override
   public Optional<ConnectorTableHandle> applyUpdate(
       ConnectorSession session,
@@ -702,7 +658,7 @@ public class GravitinoMetadata implements ConnectorMetadata 
{
                     : new ConnectorTableLayout(result.getPartitionColumns()));
   }
 
-  private SchemaTableName getTableName(ConnectorTableHandle tableHandle) {
+  protected SchemaTableName getTableName(ConnectorTableHandle tableHandle) {
     return ((GravitinoTableHandle) tableHandle).toSchemaTableName();
   }
 
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoNodePartitioningProvider.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoNodePartitioningProvider.java
index 8226b70d14..1780fbfa11 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoNodePartitioningProvider.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoNodePartitioningProvider.java
@@ -23,12 +23,10 @@ import io.trino.spi.connector.ConnectorBucketNodeMap;
 import io.trino.spi.connector.ConnectorNodePartitioningProvider;
 import io.trino.spi.connector.ConnectorPartitioningHandle;
 import io.trino.spi.connector.ConnectorSession;
-import io.trino.spi.connector.ConnectorSplit;
 import io.trino.spi.connector.ConnectorTransactionHandle;
 import io.trino.spi.type.Type;
 import java.util.List;
 import java.util.Optional;
-import java.util.function.ToIntFunction;
 
 /**
  * This class provides a ConnectorNodePartitioningProvider for Trino to decide 
data partitioning
@@ -59,17 +57,6 @@ public class GravitinoNodePartitioningProvider implements 
ConnectorNodePartition
         GravitinoHandle.unWrap(partitioningHandle));
   }
 
-  @Override
-  public ToIntFunction<ConnectorSplit> getSplitBucketFunction(
-      ConnectorTransactionHandle transactionHandle,
-      ConnectorSession session,
-      ConnectorPartitioningHandle partitioningHandle) {
-    return nodePartitioningProvider.getSplitBucketFunction(
-        GravitinoHandle.unWrap(transactionHandle),
-        session,
-        GravitinoHandle.unWrap(partitioningHandle));
-  }
-
   @Override
   public BucketFunction getBucketFunction(
       ConnectorTransactionHandle transactionHandle,
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPlugin.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPlugin.java
index 2b4332845c..80b4b96b66 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPlugin.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPlugin.java
@@ -18,15 +18,41 @@
  */
 package org.apache.gravitino.trino.connector;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import io.trino.spi.Plugin;
 import io.trino.spi.connector.ConnectorFactory;
+import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager;
 
 /** Trino plugin endpoint, using java spi mechanism */
 public class GravitinoPlugin implements Plugin {
+  private GravitinoConnectorFactory factory;
+  protected GravitinoAdminClient client;
+
+  public GravitinoPlugin() {}
+
+  public GravitinoPlugin(GravitinoAdminClient client) {
+    this.client = client;
+  }
 
   @Override
   public Iterable<ConnectorFactory> getConnectorFactories() {
-    return ImmutableList.of(new GravitinoConnectorFactory());
+    factory = createConnectorFactory(client);
+    return ImmutableList.of(factory);
+  }
+
+  protected GravitinoConnectorFactory 
createConnectorFactory(GravitinoAdminClient client) {
+    return new GravitinoConnectorFactory(client);
+  }
+
+  @VisibleForTesting
+  CatalogConnectorManager getCatalogConnectorManager() {
+    return factory.getCatalogConnectorManager();
+  }
+
+  @VisibleForTesting
+  int getTrinoVersion() {
+    return factory.getTrinoVersion();
   }
 }
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplit.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplit.java
index 6e020dd103..9d70576cbe 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplit.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplit.java
@@ -31,7 +31,7 @@ import java.util.List;
  * The GravitinoFTransactionHandle is used to make Apache Gravitino metadata 
operations
  * transactional and wrap the inner connector transaction for data access.
  */
-public class GravitinoSplit implements ConnectorSplit, 
GravitinoHandle<ConnectorSplit> {
+public abstract class GravitinoSplit implements ConnectorSplit, 
GravitinoHandle<ConnectorSplit> {
 
   private HandleWrapper<ConnectorSplit> handleWrapper = new 
HandleWrapper<>(ConnectorSplit.class);
 
@@ -75,11 +75,6 @@ public class GravitinoSplit implements ConnectorSplit, 
GravitinoHandle<Connector
     return handleWrapper.getHandle().getAddresses();
   }
 
-  @Override
-  public Object getInfo() {
-    return handleWrapper.getHandle().getInfo();
-  }
-
   @Override
   public SplitWeight getSplitWeight() {
     return handleWrapper.getHandle().getSplitWeight();
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitManager.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitManager.java
index 97f1fcc9ba..d3bdb1ecf0 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitManager.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitManager.java
@@ -18,6 +18,9 @@
  */
 package org.apache.gravitino.trino.connector;
 
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+
+import io.trino.spi.TrinoException;
 import io.trino.spi.connector.ConnectorSession;
 import io.trino.spi.connector.ConnectorSplitManager;
 import io.trino.spi.connector.ConnectorSplitSource;
@@ -53,6 +56,10 @@ public class GravitinoSplitManager implements 
ConnectorSplitManager {
             GravitinoHandle.unWrap(connectorTableHandle),
             new GravitinoDynamicFilter(dynamicFilter),
             constraint);
-    return new GravitinoSplitSource(splits);
+    return createSplitSource(splits);
+  }
+
+  protected ConnectorSplitSource createSplitSource(ConnectorSplitSource 
splits) {
+    throw new TrinoException(NOT_SUPPORTED, "Should be overridden in 
subclass");
   }
 }
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitSource.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitSource.java
index 9e8af76b5a..aaa3af89f9 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitSource.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitSource.java
@@ -18,6 +18,9 @@
  */
 package org.apache.gravitino.trino.connector;
 
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+
+import io.trino.spi.TrinoException;
 import io.trino.spi.connector.ConnectorSplit;
 import io.trino.spi.connector.ConnectorSplitSource;
 import java.util.List;
@@ -49,11 +52,15 @@ public class GravitinoSplitSource implements 
ConnectorSplitSource {
         .thenApply(
             batch -> {
               List<ConnectorSplit> list =
-                  
batch.getSplits().stream().map(GravitinoSplit::new).collect(Collectors.toList());
+                  
batch.getSplits().stream().map(this::createSplit).collect(Collectors.toList());
               return new ConnectorSplitBatch(list, batch.isNoMoreSplits());
             });
   }
 
+  protected ConnectorSplit createSplit(ConnectorSplit split) {
+    throw new TrinoException(NOT_SUPPORTED, "Should be overridden in 
subclass");
+  }
+
   @Override
   public void close() {
     connectorSplitSource.close();
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorContext.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorContext.java
index 02024b1742..83a8a6124e 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorContext.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorContext.java
@@ -40,7 +40,7 @@ public class CatalogConnectorContext {
   private final GravitinoMetalake metalake;
 
   // Connector communicates with Trino
-  private final GravitinoConnector connector;
+  private GravitinoConnector connector;
 
   // Internal connector communicates with data storage
   private final Connector internalConnector;
@@ -64,8 +64,15 @@ public class CatalogConnectorContext {
     this.metalake = metalake;
     this.internalConnector = internalConnector;
     this.adapter = adapter;
+  }
 
-    this.connector = new GravitinoConnector(catalog.geNameIdentifier(), this);
+  /**
+   * Binds the Gravitino connector to this context.
+   *
+   * @param connector the Gravitino connector
+   */
+  public void bindConnector(GravitinoConnector connector) {
+    this.connector = connector;
   }
 
   /**
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java
index e1b8a7e54d..38f5290464 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java
@@ -21,7 +21,6 @@ package org.apache.gravitino.trino.connector.catalog;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.trino.spi.TrinoException;
-import io.trino.spi.connector.Connector;
 import io.trino.spi.connector.ConnectorContext;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -35,7 +34,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.client.GravitinoAdminClient;
 import org.apache.gravitino.client.GravitinoMetalake;
@@ -76,6 +75,7 @@ public class CatalogConnectorManager {
 
   private GravitinoAdminClient gravitinoClient;
   private GravitinoConfig config;
+  private TrinoCatalogNameHandler trinoCatalogNameHandler;
 
   /**
    * Constructs a new CatalogConnectorManager with the specified catalog 
register and catalog
@@ -85,10 +85,13 @@ public class CatalogConnectorManager {
    * @param catalogFactory the catalog connector factory
    */
   public CatalogConnectorManager(
-      CatalogRegister catalogRegister, CatalogConnectorFactory catalogFactory) 
{
+      CatalogRegister catalogRegister,
+      CatalogConnectorFactory catalogFactory,
+      TrinoCatalogNameHandler trinoCatalogNameHandler) {
     this.catalogRegister = catalogRegister;
     this.catalogConnectorFactory = catalogFactory;
     this.executorService = createScheduledThreadPoolExecutor();
+    this.trinoCatalogNameHandler = trinoCatalogNameHandler;
   }
 
   private static ScheduledThreadPoolExecutor 
createScheduledThreadPoolExecutor() {
@@ -127,19 +130,15 @@ public class CatalogConnectorManager {
   /**
    * Starts the catalog connector manager with the specified Trino connector 
context.
    *
-   * @param context the Trino connector context
    * @throws Exception if the catalog connector manager fails to start
    */
-  public void start(ConnectorContext context) throws Exception {
-    catalogRegister.init(context, config);
-    if (catalogRegister.isCoordinator()) {
-      executorService.scheduleWithFixedDelay(
-          this::loadMetalake,
-          metadataUpdateIntervalSecond,
-          metadataUpdateIntervalSecond,
-          TimeUnit.SECONDS);
-    }
-
+  public void start() throws Exception {
+    catalogRegister.init(config);
+    executorService.scheduleWithFixedDelay(
+        this::loadMetalake,
+        metadataUpdateIntervalSecond,
+        metadataUpdateIntervalSecond,
+        TimeUnit.SECONDS);
     LOG.info("Gravitino CatalogConnectorManager started.");
   }
 
@@ -330,7 +329,15 @@ public class CatalogConnectorManager {
   /** Shuts down the catalog connector manager. */
   public void shutdown() {
     LOG.info("Gravitino CatalogConnectorManager shutdown.");
-    throw new NotImplementedException();
+    if (catalogRegister != null) {
+      catalogRegister.close();
+    }
+
+    executorService.shutdown();
+
+    if (gravitinoClient != null) {
+      gravitinoClient.close();
+    }
   }
 
   /**
@@ -341,7 +348,9 @@ public class CatalogConnectorManager {
    * @return the Trino catalog name
    */
   public String getTrinoCatalogName(String metalake, String catalog) {
-    return config.singleMetalakeMode() ? catalog : String.format("\"%s.%s\"", 
metalake, catalog);
+    return config.singleMetalakeMode()
+        ? catalog
+        : trinoCatalogNameHandler.getCatalogName(metalake, catalog);
   }
 
   /**
@@ -369,14 +378,21 @@ public class CatalogConnectorManager {
    * @param connectorName the name of the connector
    * @param config the Gravitino configuration
    * @param context the Trino connector context
-   * @return the created connector
+   * @return the created catalog connector context
    */
-  public Connector createConnector(
+  public CatalogConnectorContext createCatalogConnectorContext(
       String connectorName, GravitinoConfig config, ConnectorContext context) {
     try {
       String catalogConfig = config.getCatalogConfig();
 
       GravitinoCatalog catalog = GravitinoCatalog.fromJson(catalogConfig);
+      if (this.config.singleMetalakeMode()
+          && StringUtils.isNotBlank(targetMetalake)
+          && !targetMetalake.equals(catalog.getMetalake())) {
+        throw new TrinoException(
+            GravitinoErrorCode.GRAVITINO_UNSUPPORTED_OPERATION,
+            "Multiple metalakes are not supported");
+      }
       CatalogConnectorContext.Builder builder =
           
catalogConnectorFactory.createCatalogConnectorContextBuilder(catalog);
       builder
@@ -384,9 +400,10 @@ public class CatalogConnectorManager {
           .withContext(context);
 
       CatalogConnectorContext connectorContext = builder.build();
-      catalogConnectors.put(connectorName, connectorContext);
+      String fullCatalogName = getTrinoCatalogName(catalog);
+      catalogConnectors.put(fullCatalogName, connectorContext);
       LOG.info("Create connector {} successful", connectorName);
-      return connectorContext.getConnector();
+      return connectorContext;
     } catch (Exception e) {
       LOG.error("Failed to create connector: {}", connectorName, e);
       throw new TrinoException(
@@ -433,4 +450,8 @@ public class CatalogConnectorManager {
     }
     return false;
   }
+
+  public interface TrinoCatalogNameHandler {
+    String getCatalogName(String metalake, String catalog);
+  }
 }
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java
index a66bd2aed6..3e2727ac2b 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java
@@ -326,6 +326,27 @@ public class CatalogConnectorMetadata {
     }
   }
 
+  /**
+   * Adds a new column to the specified table with a position hint.
+   *
+   * @param schemaTableName the name of the schema and table
+   * @param column the Gravitino column to add
+   * @param position the target position for the new column
+   */
+  public void addColumn(
+      SchemaTableName schemaTableName,
+      GravitinoColumn column,
+      TableChange.ColumnPosition position) {
+    String[] columnNames = {column.getName()};
+    String comment = Strings.isNullOrEmpty(column.getComment()) ? null : 
column.getComment();
+    TableChange.ColumnPosition targetPosition =
+        position == null ? TableChange.ColumnPosition.defaultPos() : position;
+    applyAlter(
+        schemaTableName,
+        TableChange.addColumn(
+            columnNames, column.getType(), comment, targetPosition, 
column.isNullable()));
+  }
+
   /**
    * Drops a column from the specified table.
    *
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogRegister.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogRegister.java
index 5e26bf700d..0b79d1af33 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogRegister.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogRegister.java
@@ -23,7 +23,6 @@ import static 
org.apache.gravitino.trino.connector.GravitinoConfig.GRAVITINO_DYN
 
 import io.trino.jdbc.TrinoDriver;
 import io.trino.spi.TrinoException;
-import io.trino.spi.connector.ConnectorContext;
 import java.io.File;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -47,61 +46,14 @@ public class CatalogRegister {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CatalogRegister.class);
 
-  private static final int MIN_SUPPORT_TRINO_SPI_VERSION = 435;
-  private static final int MAX_SUPPORT_TRINO_SPI_VERSION = 439;
-  private static final int 
MIN_SUPPORT_CATALOG_NAME_WITH_METALAKE_TRINO_SPI_VERSION = 446;
   private static final int EXECUTE_QUERY_MAX_RETRIES = 6;
   private static final int EXECUTE_QUERY_BACKOFF_TIME_SECOND = 5;
 
-  private String trinoVersion;
   private Connection connection;
-  private boolean isCoordinator;
   private boolean isStarted = false;
   private String catalogStoreDirectory;
   private GravitinoConfig config;
 
-  private void checkTrinoSpiVersion(ConnectorContext context) {
-    this.trinoVersion = context.getSpiVersion();
-
-    int version = Integer.parseInt(trinoVersion);
-
-    if (version < MIN_SUPPORT_TRINO_SPI_VERSION || version > 
MAX_SUPPORT_TRINO_SPI_VERSION) {
-      Boolean skipTrinoVersionValidation = 
config.isSkipTrinoVersionValidation();
-      if (!skipTrinoVersionValidation) {
-        String errmsg =
-            String.format(
-                "Unsupported Trino-%s version. The Supported version for the 
Gravitino-Trino-connector from Trino-%d to Trino-%d."
-                    + "Maybe you can set 
gravitino.trino.skip-version-validation to skip version validation.",
-                trinoVersion, MIN_SUPPORT_TRINO_SPI_VERSION, 
MAX_SUPPORT_TRINO_SPI_VERSION);
-        throw new 
TrinoException(GravitinoErrorCode.GRAVITINO_UNSUPPORTED_TRINO_VERSION, errmsg);
-      } else {
-        LOG.warn(
-            "The version %s has not undergone thorough testing with Gravitino, 
there may be compatiablity problem.",
-            trinoVersion);
-      }
-    }
-
-    isCoordinator = context.getNodeManager().getCurrentNode().isCoordinator();
-  }
-
-  private void checkSupportCatalogNameWithMetalake(
-      ConnectorContext context, GravitinoConfig config) {
-    if (!config.singleMetalakeMode()) {
-      int version = Integer.parseInt(context.getSpiVersion());
-      if (version < MIN_SUPPORT_CATALOG_NAME_WITH_METALAKE_TRINO_SPI_VERSION) {
-        LOG.warn(
-            "Trino-{} does not support catalog name with dots, The minimal 
required version is Trino-{}."
-                + "Some errors may occur when using the USE <CATALOG>.<SCHEMA> 
statement in Trino",
-            trinoVersion,
-            MIN_SUPPORT_CATALOG_NAME_WITH_METALAKE_TRINO_SPI_VERSION);
-      }
-    }
-  }
-
-  boolean isCoordinator() {
-    return isCoordinator;
-  }
-
   boolean isTrinoStarted() {
     if (isStarted) {
       return true;
@@ -121,14 +73,11 @@ public class CatalogRegister {
    * Initializes the catalog register with the specified Trino connector 
context and Gravitino
    * configuration.
    *
-   * @param context the Trino connector context
    * @param config the Gravitino configuration
    * @throws Exception if the catalog register fails to initialize
    */
-  public void init(ConnectorContext context, GravitinoConfig config) throws 
Exception {
+  public void init(GravitinoConfig config) throws Exception {
     this.config = config;
-    checkTrinoSpiVersion(context);
-    checkSupportCatalogNameWithMetalake(context, config);
 
     TrinoDriver driver = new TrinoDriver();
     DriverManager.registerDriver(driver);
@@ -203,6 +152,8 @@ public class CatalogRegister {
       String createCatalogCommand = generateCreateCatalogCommand(name, 
catalog);
       executeSql(createCatalogCommand);
       LOG.info("Register catalog {} successfully: {}", name, 
createCatalogCommand);
+    } catch (SQLException e) {
+      throw new TrinoException(GravitinoErrorCode.GRAVITINO_RUNTIME_ERROR, 
e.getMessage(), e);
     } catch (Exception e) {
       String message = String.format("Failed to register catalog %s", name);
       LOG.error(message);
@@ -211,7 +162,7 @@ public class CatalogRegister {
   }
 
   private boolean checkCatalogExist(String name) {
-    String showCatalogCommand = String.format("SHOW CATALOGS like '%s'", name);
+    String showCatalogCommand = "SHOW CATALOGS";
     Exception failedException = null;
     try {
       int retries = EXECUTE_QUERY_MAX_RETRIES;
@@ -222,11 +173,17 @@ public class CatalogRegister {
           ResultSet rs = statement.getResultSet();
           while (rs.next()) {
             String catalogName = rs.getString(1);
-            if (catalogName.equals(name) || catalogName.equals("\"" + name + 
"\"")) {
+            // In some Trino version catalog name may be quoted, so we need to 
check both quoted and
+            // unquoted names
+            if (name.equals(catalogName)
+                || name.equals("\"" + catalogName + "\"")
+                || ("\"" + name + "\"").equals(catalogName)) {
               return true;
             }
           }
           return false;
+        } catch (SQLException e) {
+          throw e;
         } catch (Exception e) {
           failedException = e;
           LOG.warn("Execute command failed: {}, ", showCatalogCommand, e);
@@ -252,6 +209,8 @@ public class CatalogRegister {
           // check the catalog is already created
           statement.execute(sql);
           return;
+        } catch (SQLException e) {
+          throw e;
         } catch (Exception e) {
           failedException = e;
           LOG.warn("Execute command failed: {}, ", sql, e);
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/system/GravitinoSystemConnector.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/system/GravitinoSystemConnector.java
index c6b894170c..57ee95c0ea 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/system/GravitinoSystemConnector.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/system/GravitinoSystemConnector.java
@@ -18,10 +18,13 @@
  */
 package org.apache.gravitino.trino.connector.system;
 
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import io.trino.spi.HostAddress;
 import io.trino.spi.Page;
+import io.trino.spi.TrinoException;
 import io.trino.spi.connector.ColumnHandle;
 import io.trino.spi.connector.Connector;
 import io.trino.spi.connector.ConnectorMetadata;
@@ -79,16 +82,30 @@ public class GravitinoSystemConnector implements Connector {
   @Override
   public ConnectorMetadata getMetadata(
       ConnectorSession session, ConnectorTransactionHandle transactionHandle) {
+    return createMetadata();
+  }
+
+  protected ConnectorMetadata createMetadata() {
     return new GravitinoSystemConnectorMetadata();
   }
 
   @Override
   public ConnectorSplitManager getSplitManager() {
-    return new SplitManager();
+    return createSplitManager();
   }
 
   @Override
   public ConnectorPageSourceProvider getPageSourceProvider() {
+    return createPageSourceProvider();
+  }
+
+  public void shutdown() {}
+
+  protected ConnectorSplitManager createSplitManager() {
+    return new SplitManager();
+  }
+
+  protected ConnectorPageSourceProvider createPageSourceProvider() {
     return new DatasourceProvider();
   }
 
@@ -112,7 +129,11 @@ public class GravitinoSystemConnector implements Connector 
{
 
       SchemaTableName tableName =
           ((GravitinoSystemConnectorMetadata.SystemTableHandle) 
table).getName();
-      return new 
SystemTablePageSource(GravitinoSystemTableFactory.loadPageData(tableName));
+      return 
createPageSource(GravitinoSystemTableFactory.loadPageData(tableName));
+    }
+
+    protected ConnectorPageSource createPageSource(Page page) {
+      throw new TrinoException(NOT_SUPPORTED, "Should be overridden in 
subclass");
     }
   }
 
@@ -129,13 +150,18 @@ public class GravitinoSystemConnector implements 
Connector {
 
       SchemaTableName tableName =
           ((GravitinoSystemConnectorMetadata.SystemTableHandle) 
connectorTableHandle).getName();
-      return new FixedSplitSource(new Split(tableName));
+      return new FixedSplitSource(createSplit(tableName));
+    }
+
+    protected ConnectorSplit createSplit(SchemaTableName tableName) {
+      throw new TrinoException(NOT_SUPPORTED, "Should be overridden in 
subclass");
     }
   }
 
   /** The split. */
-  public static class Split implements ConnectorSplit {
-    private final SchemaTableName tableName;
+  public abstract static class Split implements ConnectorSplit {
+
+    protected final SchemaTableName tableName;
 
     /**
      * Constructs a new Split with the specified table name.
@@ -166,18 +192,13 @@ public class GravitinoSystemConnector implements 
Connector {
     public List<HostAddress> getAddresses() {
       return Collections.emptyList();
     }
-
-    @Override
-    public Object getInfo() {
-      return this;
-    }
   }
 
   /** The system table page source. */
-  public static class SystemTablePageSource implements ConnectorPageSource {
+  public abstract static class SystemTablePageSource implements 
ConnectorPageSource {
 
-    private boolean isFinished = false;
-    private final Page page;
+    protected boolean isFinished = false;
+    protected final Page page;
 
     /**
      * Constructs a new SystemTablePageSource.
@@ -203,8 +224,7 @@ public class GravitinoSystemConnector implements Connector {
       return isFinished;
     }
 
-    @Override
-    public Page getNextPage() {
+    public Page nextPage() {
       if (isFinished) {
         return null;
       }
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/BlockJsonSerde.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/BlockJsonSerde.java
index 2ed83b1be8..e100665668 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/BlockJsonSerde.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/BlockJsonSerde.java
@@ -37,6 +37,7 @@ import java.lang.reflect.Method;
  */
 public final class BlockJsonSerde {
   private static final String BLOCK_SERDE_UTIL_CLASS_NAME = 
"io.trino.block.BlockSerdeUtil";
+  private static final int DEFAULT_BLOCK_ENCODING_NAME_LENGTH = 64;
 
   /**
    * Jackson serializer for Trino {@link Block} objects. Serializes block 
instances using Trino's
@@ -65,11 +66,13 @@ public final class BlockJsonSerde {
     public void serialize(
         Block block, JsonGenerator jsonGenerator, SerializerProvider 
serializerProvider)
         throws IOException {
-      //  Encoding name is length prefixed as are many block encodings
-      SliceOutput output =
-          new DynamicSliceOutput(
-              toIntExact(
-                  block.getSizeInBytes() + block.getEncodingName().length() + 
(2 * Integer.BYTES)));
+      // estimate size to avoid multiple resizes, encoding name is length 
prefixed as are many block
+      // like : SIZE_OF_INT + blockEncoding.getName().length() + 
block.getSizeInBytes();
+      // we use a default length for encoding name to avoid calculating actual 
length
+      int estimatedSize =
+          toIntExact(
+              block.getSizeInBytes() + (2 * Integer.BYTES) + 
DEFAULT_BLOCK_ENCODING_NAME_LENGTH);
+      SliceOutput output = new DynamicSliceOutput(estimatedSize);
 
       try {
         writeBlock.invoke(null, blockEncodingSerde, output, block);
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/AbstractGravitinoConnectorTest.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/AbstractGravitinoConnectorTest.java
new file mode 100644
index 0000000000..ac0d223cb3
--- /dev/null
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/AbstractGravitinoConnectorTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector;
+
+import io.trino.plugin.memory.MemoryPlugin;
+import io.trino.testing.AbstractTestQueryFramework;
+import io.trino.testing.DistributedQueryRunner;
+import io.trino.testing.QueryRunner;
+import java.util.concurrent.TimeUnit;
+import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager;
+import org.awaitility.Awaitility;
+
+abstract class AbstractGravitinoConnectorTest extends 
AbstractTestQueryFramework {
+
+  GravitinoMockServer server;
+  int trinoVersion;
+
+  @Override
+  protected QueryRunner createQueryRunner() throws Exception {
+    GravitinoAdminClient gravitinoClient = createGravitinoClient();
+    try {
+
+      DistributedQueryRunner queryRunner = createTrinoQueryRunner();
+
+      GravitinoPlugin gravitinoPlugin = createGravitinoPlugin(gravitinoClient);
+      queryRunner.installPlugin(gravitinoPlugin);
+
+      configureCatalogs(queryRunner, gravitinoClient);
+
+      
GravitinoConnectorPluginManager.instance(this.getClass().getClassLoader())
+          .installPlugin("memory", new MemoryPlugin());
+      CatalogConnectorManager catalogConnectorManager =
+          gravitinoPlugin.getCatalogConnectorManager();
+      trinoVersion = gravitinoPlugin.getTrinoVersion();
+      server.setCatalogConnectorManager(catalogConnectorManager, trinoVersion);
+
+      // Wait for the catalog to be created. Wait for at least 30 seconds.
+      Awaitility.await()
+          .atMost(30, TimeUnit.SECONDS)
+          .pollInterval(1, TimeUnit.SECONDS)
+          .until(() -> !catalogConnectorManager.getCatalogs().isEmpty());
+
+      return queryRunner;
+    } catch (Exception e) {
+      throw new RuntimeException("Create query runner failed", e);
+    }
+  }
+
+  protected GravitinoPlugin createGravitinoPlugin(GravitinoAdminClient client) 
{
+    return new GravitinoPlugin(client);
+  }
+
+  protected abstract DistributedQueryRunner createTrinoQueryRunner() throws 
Exception;
+
+  protected GravitinoAdminClient createGravitinoClient() {
+    server = closeAfterClass(new GravitinoMockServer());
+    return server.createGravitinoClient();
+  }
+
+  protected abstract void configureCatalogs(
+      DistributedQueryRunner queryRunner, GravitinoAdminClient 
gravitinoClient) throws Exception;
+}
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/GravitinoMockServer.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/GravitinoMockServer.java
index b76eea855f..2f8097f88b 100644
--- 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/GravitinoMockServer.java
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/GravitinoMockServer.java
@@ -19,6 +19,7 @@
 package org.apache.gravitino.trino.connector;
 
 import static java.util.Collections.emptyMap;
+import static 
org.apache.gravitino.trino.connector.TestGravitinoConnector.SPI_VERSION_SUPPORT_ADD_COLUMN_WITH_POSITION;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyMap;
@@ -30,7 +31,9 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import io.trino.plugin.memory.MemoryConnector;
 import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ColumnMetadata;
 import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorSession;
 import io.trino.spi.connector.ConnectorTableHandle;
 import io.trino.spi.connector.ConnectorTableMetadata;
 import io.trino.spi.connector.SaveMode;
@@ -79,19 +82,32 @@ public class GravitinoMockServer implements AutoCloseable {
   private boolean start = true;
   CatalogConnectorManager catalogConnectorManager;
   private GeneralDataTypeTransformer dataTypeTransformer = new 
HiveDataTypeTransformer();
+  private int trinoVersion = 0;
 
   public GravitinoMockServer() {
     createMetalake(testMetalake);
     createCatalog(testMetalake, testCatalog, ImmutableMap.of());
   }
 
-  public void setCatalogConnectorManager(CatalogConnectorManager 
catalogConnectorManager) {
+  public void setCatalogConnectorManager(
+      CatalogConnectorManager catalogConnectorManager, int trinoVersion) {
     this.catalogConnectorManager = catalogConnectorManager;
+    this.trinoVersion = trinoVersion;
   }
 
   public GravitinoAdminClient createGravitinoClient() {
     GravitinoAdminClient client = mock(GravitinoAdminClient.class);
 
+    when(client.listMetalakes())
+        .thenAnswer(
+            new Answer<GravitinoMetalake[]>() {
+              @Override
+              public GravitinoMetalake[] answer(InvocationOnMock invocation) 
throws Throwable {
+                return metalakes.values().stream()
+                    .map(metalake -> metalake.metalake)
+                    .toArray(GravitinoMetalake[]::new);
+              }
+            });
     when(client.createMetalake(anyString(), anyString(), anyMap()))
         .thenAnswer(
             new Answer<GravitinoMetalake>() {
@@ -558,7 +574,7 @@ public class GravitinoMockServer implements AutoCloseable {
           catalogConnectorManager
               
.getCatalogConnector(catalogConnectorManager.getTrinoCatalogName(catalog))
               .getMetadataAdapter();
-      metadata.addColumn(null, tableHandle, 
metadataAdapter.getColumnMetadata(column));
+      addColumn(metadata, tableHandle, 
metadataAdapter.getColumnMetadata(column));
 
     } else if (tableChange instanceof TableChange.DeleteColumn) {
       TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) 
tableChange;
@@ -603,6 +619,42 @@ public class GravitinoMockServer implements AutoCloseable {
     }
   }
 
+  private void addColumn(
+      ConnectorMetadata metadata, ConnectorTableHandle tableHandle, 
ColumnMetadata columnMetadata) {
+    if (trinoVersion < SPI_VERSION_SUPPORT_ADD_COLUMN_WITH_POSITION) {
+      try {
+        metadata
+            .getClass()
+            .getMethod(
+                "addColumn",
+                ConnectorSession.class,
+                ConnectorTableHandle.class,
+                ColumnMetadata.class)
+            .invoke(metadata, null, tableHandle, columnMetadata);
+      } catch (ReflectiveOperationException e) {
+        throw new RuntimeException("Failed to invoke legacy addColumn by 
reflection", e);
+      }
+    } else {
+      try {
+        Class<?> columnPositionClass = 
Class.forName("io.trino.spi.connector.ColumnPosition");
+        Class<?> lastPositionClass = 
Class.forName("io.trino.spi.connector.ColumnPosition$Last");
+        Object position = 
lastPositionClass.getDeclaredConstructor().newInstance();
+
+        metadata
+            .getClass()
+            .getMethod(
+                "addColumn",
+                ConnectorSession.class,
+                ConnectorTableHandle.class,
+                ColumnMetadata.class,
+                columnPositionClass)
+            .invoke(metadata, null, tableHandle, columnMetadata, position);
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to invoke addColumn with 
ColumnPosition", e);
+      }
+    }
+  }
+
   @ResourcePresence
   public boolean isRunning() {
     return start;
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestCreateGravitinoConnector.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestCreateGravitinoConnector.java
deleted file mode 100644
index 2845d3c604..0000000000
--- 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestCreateGravitinoConnector.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.trino.connector;
-
-import static io.trino.testing.TestingSession.testSessionBuilder;
-import static org.assertj.core.api.Assertions.assertThat;
-
-import io.trino.Session;
-import io.trino.testing.DistributedQueryRunner;
-import io.trino.testing.QueryRunner;
-import java.util.HashMap;
-import org.apache.gravitino.client.GravitinoAdminClient;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-
-@Disabled
-public class TestCreateGravitinoConnector {
-
-  GravitinoMockServer server;
-
-  @Test
-  public void testCreateConnectorsWithEnableSimpleCatalog() throws Exception {
-    server = new GravitinoMockServer();
-    Session session = testSessionBuilder().setCatalog("gravitino").build();
-    QueryRunner queryRunner = 
DistributedQueryRunner.builder(session).setNodeCount(1).build();
-
-    GravitinoAdminClient gravitinoClient = server.createGravitinoClient();
-    TestGravitinoPlugin gravitinoPlugin = new 
TestGravitinoPlugin(gravitinoClient);
-    queryRunner.installPlugin(gravitinoPlugin);
-
-    // test create two connector and set gravitino.simplify-catalog-names = 
true
-    {
-      // create a gravitino connector named gravitino using metalake test
-      HashMap<String, String> properties = new HashMap<>();
-      properties.put("gravitino.metalake", "test");
-      properties.put("gravitino.uri", "http://127.0.0.1:8090";);
-      properties.put("gravitino.simplify-catalog-names", "true");
-      queryRunner.createCatalog("test0", "gravitino", properties);
-    }
-
-    {
-      // Test failed to create catalog with different metalake
-      HashMap<String, String> properties = new HashMap<>();
-      properties.put("gravitino.metalake", "test1");
-      properties.put("gravitino.uri", "http://127.0.0.1:8090";);
-      properties.put("gravitino.simplify-catalog-names", "true");
-      try {
-        queryRunner.createCatalog("test1", "gravitino", properties);
-      } catch (Exception e) {
-        assertThat(e.getMessage()).contains("Multiple metalakes are not 
supported");
-      }
-    }
-
-    server.close();
-  }
-
-  @Test
-  public void testCreateConnectorsWithDisableSimpleCatalog() throws Exception {
-    server = new GravitinoMockServer();
-    Session session = testSessionBuilder().setCatalog("gravitino").build();
-    QueryRunner queryRunner = 
DistributedQueryRunner.builder(session).setNodeCount(1).build();
-
-    GravitinoAdminClient gravitinoClient = server.createGravitinoClient();
-    TestGravitinoPlugin gravitinoPlugin = new 
TestGravitinoPlugin(gravitinoClient);
-    queryRunner.installPlugin(gravitinoPlugin);
-
-    // test create two connector and set gravitino.simplify-catalog-names = 
false
-    {
-      // create a gravitino connector named gravitino using metalake test
-      HashMap<String, String> properties = new HashMap<>();
-      properties.put("gravitino.metalake", "test");
-      properties.put("gravitino.uri", "http://127.0.0.1:8090";);
-      properties.put("gravitino.simplify-catalog-names", "false");
-      queryRunner.createCatalog("test0", "gravitino", properties);
-    }
-
-    {
-      // Test failed to create catalog with different metalake
-      HashMap<String, String> properties = new HashMap<>();
-      properties.put("gravitino.metalake", "test1");
-      properties.put("gravitino.uri", "http://127.0.0.1:8090";);
-      properties.put("gravitino.simplify-catalog-names", "false");
-      queryRunner.createCatalog("test1", "gravitino", properties);
-    }
-
-    server.close();
-  }
-}
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConfig.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConfig.java
index e4a8c9e5fe..f79fa1bfe2 100644
--- 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConfig.java
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConfig.java
@@ -28,18 +28,10 @@ import com.google.common.collect.ImmutableMap;
 import io.trino.spi.TrinoException;
 import java.util.Map;
 import java.util.regex.Pattern;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.jupiter.api.Test;
 
 public class TestGravitinoConfig {
 
-  @BeforeClass
-  public static void startup() throws Exception {}
-
-  @AfterClass
-  public static void shutdown() throws Exception {}
-
   @Test
   public void testGravitinoConfig() {
     String gravitinoUrl = "http://127.0.0.1:8000";;
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnector.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnector.java
index 30ac739009..99a7f803ac 100644
--- 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnector.java
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnector.java
@@ -18,72 +18,37 @@
  */
 package org.apache.gravitino.trino.connector;
 
-import static io.trino.testing.TestingSession.testSessionBuilder;
 import static java.lang.String.format;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import com.google.common.base.Preconditions;
-import io.trino.Session;
-import io.trino.plugin.memory.MemoryPlugin;
-import io.trino.testing.AbstractTestQueryFramework;
 import io.trino.testing.DistributedQueryRunner;
 import io.trino.testing.MaterializedResult;
 import io.trino.testing.MaterializedRow;
-import io.trino.testing.QueryRunner;
 import java.util.HashMap;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import org.apache.gravitino.client.GravitinoAdminClient;
-import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager;
-import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 
-public class TestGravitinoConnector extends AbstractTestQueryFramework {
+public abstract class TestGravitinoConnector extends 
AbstractGravitinoConnectorTest {
 
-  GravitinoMockServer server;
+  public static final int SPI_VERSION_TEST_SUPPORT_RENAME_COLUMN = 452;
+  public static final int SPI_VERSION_SUPPORT_ADD_COLUMN_WITH_POSITION = 468;
+  public static final int SPI_VERSION_TEST_SUPPORT_ADD_COLUMN = 469;
 
   @Override
-  protected QueryRunner createQueryRunner() throws Exception {
-    server = closeAfterClass(new GravitinoMockServer());
-    GravitinoAdminClient gravitinoClient = server.createGravitinoClient();
-
-    Session session = testSessionBuilder().setCatalog("gravitino").build();
-    try {
-
-      DistributedQueryRunner queryRunner =
-          DistributedQueryRunner.builder(session).setNodeCount(1).build();
-
-      TestGravitinoPlugin gravitinoPlugin = new 
TestGravitinoPlugin(gravitinoClient);
-      queryRunner.installPlugin(gravitinoPlugin);
-
-      // create a gravitino connector named gravitino using metalake test
-      HashMap<String, String> properties = new HashMap<>();
-      properties.put("gravitino.metalake", "test");
-      properties.put("gravitino.uri", "http://127.0.0.1:8090";);
-      properties.put(
-          "catalog.config-dir", 
queryRunner.getCoordinator().getBaseDataDir().toString());
-      properties.put("discovery.uri", 
queryRunner.getCoordinator().getBaseUrl().toString());
-      queryRunner.createCatalog("gravitino", "gravitino", properties);
-
-      
GravitinoConnectorPluginManager.instance(this.getClass().getClassLoader())
-          .installPlugin("memory", new MemoryPlugin());
-      CatalogConnectorManager catalogConnectorManager =
-          gravitinoPlugin.getCatalogConnectorManager();
-      server.setCatalogConnectorManager(catalogConnectorManager);
-
-      // Wait for the catalog to be created. Wait for at least 30 seconds.
-      Awaitility.await()
-          .atMost(30, TimeUnit.SECONDS)
-          .pollInterval(1, TimeUnit.SECONDS)
-          .until(() -> !catalogConnectorManager.getCatalogs().isEmpty());
-
-      return queryRunner;
-    } catch (Exception e) {
-      throw new RuntimeException("Create query runner failed", e);
-    }
+  protected void configureCatalogs(
+      DistributedQueryRunner queryRunner, GravitinoAdminClient 
gravitinoClient) {
+    // create a gravitino connector named gravitino using metalake test
+    HashMap<String, String> properties = new HashMap<>();
+    properties.put("gravitino.metalake", "test");
+    properties.put("gravitino.uri", "http://127.0.0.1:8090";);
+    properties.put("catalog.config-dir", 
queryRunner.getCoordinator().getBaseDataDir().toString());
+    properties.put("discovery.uri", 
queryRunner.getCoordinator().getBaseUrl().toString());
+    queryRunner.createCatalog("gravitino", "gravitino", properties);
   }
 
   @Test
@@ -189,38 +154,51 @@ public class TestGravitinoConnector extends 
AbstractTestQueryFramework {
 
     createTestTable(fullTableName1);
 
-    // test add column and drop column, but the memory connector is not 
supported these operations.
-    assertQueryFails(
-        String.format("alter table %s add column if not exists c varchar", 
fullTableName1),
-        format("This connector does not support adding columns"));
-
-    assertQueryFails(
-        String.format("alter table %s drop column a", fullTableName1),
-        format("This connector does not support dropping columns"));
-
     // test set table comment
     assertUpdate(String.format("comment on table %s is 'test table comments'", 
fullTableName1));
     assertThat((String) computeScalar("show create table " + fullTableName1))
         .contains("COMMENT 'test table comments'");
 
-    // test rename column, but the memory connector is not supported these 
operations.
-    assertQueryFails(
-        String.format("alter table %s rename column a to c ", fullTableName1),
-        format("This connector does not support renaming columns"));
-
-    assertQueryFails(
-        String.format("alter table %s alter column a set DATA TYPE int", 
fullTableName1),
-        format("This connector does not support setting column types"));
-
     // test set column comment
     assertUpdate(String.format("comment on column %s.a is 'test column 
comments'", fullTableName1));
     assertThat((String) computeScalar("show create table " + fullTableName1))
         .contains("COMMENT 'test column comments'");
 
+    // test add column and drop column, but the memory connector is not 
supported these operations.
+    if (trinoVersion < SPI_VERSION_TEST_SUPPORT_ADD_COLUMN) {
+      assertQueryFails(
+          String.format("alter table %s add column if not exists c varchar", 
fullTableName1),
+          "This connector does not support adding columns");
+    } else {
+      assertUpdate(
+          String.format("alter table %s add column if not exists c varchar", 
fullTableName1));
+      assertThat((String) computeScalar("show create table " + fullTableName1))
+          .contains("c varchar");
+    }
+
+    assertQueryFails(
+        String.format("alter table %s drop column a", fullTableName1),
+        "This connector does not support dropping columns");
+
+    // test rename column, but the memory connector is not supported these 
operations.
+    if (trinoVersion < SPI_VERSION_TEST_SUPPORT_RENAME_COLUMN) {
+      assertQueryFails(
+          String.format("alter table %s rename column b to d ", 
fullTableName1),
+          "This connector does not support renaming columns");
+    } else {
+      assertUpdate(String.format("alter table %s rename column b to d ", 
fullTableName1));
+      assertThat((String) computeScalar("show create table " + fullTableName1))
+          .contains("d integer");
+    }
+
+    assertQueryFails(
+        String.format("alter table %s alter column a set DATA TYPE int", 
fullTableName1),
+        "This connector does not support setting column types");
+
     // test set table properties, but the memory connector is not supported 
these operations.
     assertQueryFails(
         String.format("alter table %s set properties \"max_ttl\" = 20", 
fullTableName1),
-        format("This connector does not support setting table properties"));
+        "This connector does not support setting table properties");
 
     dropTestTable(fullTableName1);
   }
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorFactory.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorFactory.java
deleted file mode 100644
index 8b802d53b0..0000000000
--- 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.trino.connector;
-
-import java.util.function.Supplier;
-import org.apache.gravitino.client.GravitinoAdminClient;
-
-public class TestGravitinoConnectorFactory extends GravitinoConnectorFactory {
-  private GravitinoAdminClient gravitinoClient;
-
-  public void setGravitinoClient(GravitinoAdminClient client) {
-    this.gravitinoClient = client;
-  }
-
-  @Override
-  Supplier<GravitinoAdminClient> clientProvider() {
-    return () -> gravitinoClient;
-  }
-}
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorNullChecks.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorNullChecks.java
index bf69565b88..1c1bbcb8b6 100644
--- 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorNullChecks.java
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorNullChecks.java
@@ -26,16 +26,27 @@ import static org.mockito.Mockito.when;
 
 import io.trino.spi.connector.Connector;
 import io.trino.spi.transaction.IsolationLevel;
+import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.SupportsSchemas;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.rel.TableCatalog;
 import org.apache.gravitino.trino.connector.catalog.CatalogConnectorContext;
+import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog;
 import org.junit.jupiter.api.Test;
 
 class TestGravitinoConnectorNullChecks {
   @Test
   void testBeginTransactionThrowsIfInternalConnectorIsNull() {
+    GravitinoCatalog mockCatalog = mock(GravitinoCatalog.class);
+    
when(mockCatalog.geNameIdentifier()).thenReturn(NameIdentifier.of("metalake", 
"catalog"));
+
     CatalogConnectorContext mockContext = mock(CatalogConnectorContext.class);
+    when(mockContext.getCatalog()).thenReturn(mockCatalog);
+    GravitinoMetalake metalake = mockMetalake();
+    when(mockContext.getMetalake()).thenReturn(metalake);
     when(mockContext.getInternalConnector()).thenReturn(null);
-    GravitinoConnector connector = new 
GravitinoConnector(mock(NameIdentifier.class), mockContext);
+    GravitinoConnector connector = new GravitinoConnector(mockContext);
     assertThrows(
         IllegalArgumentException.class,
         () -> connector.beginTransaction(IsolationLevel.READ_COMMITTED, true, 
true));
@@ -43,14 +54,30 @@ class TestGravitinoConnectorNullChecks {
 
   @Test
   void testBeginTransactionThrowsIfInternalTransactionHandleIsNull() {
+    GravitinoCatalog mockCatalog = mock(GravitinoCatalog.class);
+    
when(mockCatalog.geNameIdentifier()).thenReturn(NameIdentifier.of("metalake", 
"catalog"));
+
     CatalogConnectorContext mockContext = mock(CatalogConnectorContext.class);
+    when(mockContext.getCatalog()).thenReturn(mockCatalog);
+    GravitinoMetalake metalake = mockMetalake();
+    when(mockContext.getMetalake()).thenReturn(metalake);
+
     Connector mockInternalConnector = mock(Connector.class);
     when(mockContext.getInternalConnector()).thenReturn(mockInternalConnector);
     when(mockInternalConnector.beginTransaction(any(), anyBoolean(), 
anyBoolean()))
         .thenReturn(null);
-    GravitinoConnector connector = new 
GravitinoConnector(mock(NameIdentifier.class), mockContext);
+    GravitinoConnector connector = new GravitinoConnector(mockContext);
     assertThrows(
         IllegalArgumentException.class,
         () -> connector.beginTransaction(IsolationLevel.READ_COMMITTED, true, 
true));
   }
+
+  private static GravitinoMetalake mockMetalake() {
+    GravitinoMetalake metalake = mock(GravitinoMetalake.class);
+    Catalog catalog = mock(Catalog.class);
+    when(catalog.asSchemas()).thenReturn(mock(SupportsSchemas.class));
+    when(catalog.asTableCatalog()).thenReturn(mock(TableCatalog.class));
+    when(metalake.loadCatalog(any())).thenReturn(catalog);
+    return metalake;
+  }
 }
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithMetalakeCatalogName.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithMetalakeCatalogName.java
index c60c5910b8..b1cef8e4cc 100644
--- 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithMetalakeCatalogName.java
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithMetalakeCatalogName.java
@@ -18,91 +18,43 @@
  */
 package org.apache.gravitino.trino.connector;
 
-import static io.trino.testing.TestingSession.testSessionBuilder;
 import static java.lang.String.format;
 import static org.assertj.core.api.Assertions.assertThat;
 
-import io.trino.Session;
-import io.trino.plugin.memory.MemoryPlugin;
-import io.trino.testing.AbstractTestQueryFramework;
 import io.trino.testing.DistributedQueryRunner;
 import io.trino.testing.MaterializedResult;
 import io.trino.testing.MaterializedRow;
-import io.trino.testing.QueryRunner;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import org.apache.gravitino.client.GravitinoAdminClient;
-import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager;
-import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
-@Disabled
-public class TestGravitinoConnectorWithMetalakeCatalogName extends 
AbstractTestQueryFramework {
-
-  GravitinoMockServer server;
+public abstract class TestGravitinoConnectorWithMetalakeCatalogName
+    extends AbstractGravitinoConnectorTest {
 
   @Override
-  protected QueryRunner createQueryRunner() throws Exception {
-    server = closeAfterClass(new GravitinoMockServer());
-    GravitinoAdminClient gravitinoClient = server.createGravitinoClient();
-
-    Session session = testSessionBuilder().setCatalog("gravitino").build();
-
-    try {
-      DistributedQueryRunner queryRunner =
-          DistributedQueryRunner.builder(session).setNodeCount(1).build();
-
-      TestGravitinoPlugin gravitinoPlugin = new 
TestGravitinoPlugin(gravitinoClient);
-      queryRunner.installPlugin(gravitinoPlugin);
-
-      {
-        // create a gravitino connector named gravitino using metalake test
-        HashMap<String, String> properties = new HashMap<>();
-        properties.put("gravitino.metalake", "test");
-        properties.put("gravitino.uri", "http://127.0.0.1:8090";);
-        properties.put("gravitino.simplify-catalog-names", "false");
-        properties.put(
-            "trino.catalog.store", 
queryRunner.getCoordinator().getBaseDataDir().toString());
-        properties.put(
-            "trino.jdbc.uri",
-            
queryRunner.getCoordinator().getBaseUrl().toString().replace("http", 
"jdbc:trino"));
-        queryRunner.createCatalog("gravitino", "gravitino", properties);
-      }
-
-      {
-        // create a gravitino connector named test1 using metalake gravitino1
-        HashMap<String, String> properties = new HashMap<>();
-        properties.put("gravitino.metalake", "test1");
-        properties.put("gravitino.uri", "http://127.0.0.1:8090";);
-        properties.put("gravitino.simplify-catalog-names", "false");
-        properties.put(
-            "trino.catalog.store", 
queryRunner.getCoordinator().getBaseDataDir().toString());
-        properties.put(
-            "trino.jdbc.uri",
-            
queryRunner.getCoordinator().getBaseUrl().toString().replace("http", 
"jdbc:trino"));
-        queryRunner.createCatalog("gravitino1", "gravitino", properties);
-      }
-
-      
GravitinoConnectorPluginManager.instance(this.getClass().getClassLoader())
-          .installPlugin("memory", new MemoryPlugin());
-      CatalogConnectorManager catalogConnectorManager =
-          gravitinoPlugin.getCatalogConnectorManager();
-      server.setCatalogConnectorManager(catalogConnectorManager);
-
-      // Wait for the catalog to be created. Wait for at least 30 seconds.
-      Awaitility.await()
-          .atMost(30, TimeUnit.SECONDS)
-          .pollInterval(1, TimeUnit.SECONDS)
-          .until(() -> !catalogConnectorManager.getCatalogs().isEmpty());
-      return queryRunner;
-
-    } catch (Exception e) {
-      throw new RuntimeException("Create query runner failed", e);
-    }
+  protected void configureCatalogs(
+      DistributedQueryRunner queryRunner, GravitinoAdminClient 
gravitinoClient) {
+    // create a gravitino connector named gravitino using metalake test
+    HashMap<String, String> properties = new HashMap<>();
+    properties.put("gravitino.metalake", "test");
+    properties.put("gravitino.uri", "http://127.0.0.1:8090";);
+    properties.put("gravitino.use-single-metalake", "false");
+    properties.put("catalog.config-dir", 
queryRunner.getCoordinator().getBaseDataDir().toString());
+    properties.put("discovery.uri", 
queryRunner.getCoordinator().getBaseUrl().toString());
+    queryRunner.createCatalog("gravitino", "gravitino", properties);
+
+    // create a gravitino connector named test1 using metalake gravitino1
+    HashMap<String, String> secondProperties = new HashMap<>();
+    secondProperties.put("gravitino.metalake", "test1");
+    secondProperties.put("gravitino.uri", "http://127.0.0.1:8090";);
+    secondProperties.put("gravitino.use-single-metalake", "false");
+    secondProperties.put(
+        "catalog.config-dir", 
queryRunner.getCoordinator().getBaseDataDir().toString());
+    secondProperties.put("discovery.uri", 
queryRunner.getCoordinator().getBaseUrl().toString());
+    queryRunner.createCatalog("gravitino1", "gravitino", secondProperties);
   }
 
   @Test
@@ -121,42 +73,49 @@ public class TestGravitinoConnectorWithMetalakeCatalogName 
extends AbstractTestQ
     // testing the catalogs
     assertThat(computeActual("show 
catalogs").getOnlyColumnAsSet()).contains("gravitino");
     assertThat(computeActual("show 
catalogs").getOnlyColumnAsSet()).contains("gravitino1");
-    assertThat(computeActual("show 
catalogs").getOnlyColumnAsSet()).contains("\"test.memory\"");
+    assertThat(computeActual("show catalogs").getOnlyColumnAsSet())
+        .contains(getTrinoCliCatalogName("test", "memory"));
 
     // testing the gravitino connector framework works.
     assertThat(computeActual("select * from 
system.jdbc.tables").getRowCount()).isGreaterThan(1);
 
     // test metalake named test. the connector name is gravitino
     assertUpdate("call gravitino.system.create_catalog('memory1', 'memory', 
Map())");
-    assertThat(computeActual("show 
catalogs").getOnlyColumnAsSet()).contains("\"test.memory1\"");
+    assertThat(computeActual("show catalogs").getOnlyColumnAsSet())
+        .contains(getTrinoCliCatalogName("test", "memory1"));
     assertUpdate("call gravitino.system.drop_catalog('memory1')");
     assertThat(computeActual("show catalogs").getOnlyColumnAsSet())
-        .doesNotContain("\"test.memory1\"");
+        .doesNotContain(getTrinoCliCatalogName("test", "memory1"));
 
     assertUpdate(
         "call gravitino.system.create_catalog("
             + "catalog=>'memory1', provider=>'memory', properties => 
Map(array['max_ttl'], array['10']), ignore_exist => true)");
-    assertThat(computeActual("show 
catalogs").getOnlyColumnAsSet()).contains("\"test.memory1\"");
+    assertThat(computeActual("show catalogs").getOnlyColumnAsSet())
+        .contains(getTrinoCliCatalogName("test", "memory1"));
 
     assertUpdate(
         "call gravitino.system.drop_catalog(catalog => 'memory1', 
ignore_not_exist => true)");
     assertThat(computeActual("show catalogs").getOnlyColumnAsSet())
-        .doesNotContain("\"test.memory1\"");
+        .doesNotContain(getTrinoCliCatalogName("test", "memory1"));
 
     // test metalake named test1. the connector name is gravitino1
     GravitinoAdminClient gravitinoClient = server.createGravitinoClient();
     gravitinoClient.createMetalake("test1", "", Collections.emptyMap());
 
     assertUpdate("call gravitino1.system.create_catalog('memory1', 'memory', 
Map())");
-    assertThat(computeActual("show 
catalogs").getOnlyColumnAsSet()).contains("\"test1.memory1\"");
+    assertThat(computeActual("show catalogs").getOnlyColumnAsSet())
+        .contains(getTrinoCliCatalogName("test1", "memory1"));
     assertUpdate("call gravitino1.system.drop_catalog('memory1')");
     assertThat(computeActual("show catalogs").getOnlyColumnAsSet())
-        .doesNotContain("\"test1.memory1\"");
+        .doesNotContain(getTrinoCliCatalogName("test1", "memory1"));
   }
 
   @Test
   public void testCreateTable() {
-    String fullSchemaName = "\"test.memory\".db_01";
+    assertThat(computeActual("show catalogs").getOnlyColumnAsSet())
+        .contains(getTrinoCliCatalogName("test", "memory"));
+
+    String fullSchemaName = getTrinoSqlCatalogName("test", "memory") + 
".db_01";
     String tableName = "tb_01";
     String fullTableName = fullSchemaName + "." + tableName;
 
@@ -178,4 +137,12 @@ public class TestGravitinoConnectorWithMetalakeCatalogName 
extends AbstractTestQ
     assertUpdate("drop table " + fullTableName);
     assertUpdate("drop schema " + fullSchemaName);
   }
+
+  protected String getTrinoCliCatalogName(String metalakeName, String 
catalogName) {
+    return "\"" + metalakeName + "." + catalogName + "\"";
+  }
+
+  protected String getTrinoSqlCatalogName(String metalakeName, String 
catalogName) {
+    return "\"\"\"" + metalakeName + "." + catalogName + "\"\"\"";
+  }
 }
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithSkipCatalog.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithSkipCatalog.java
deleted file mode 100644
index c3ad8f63c4..0000000000
--- 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithSkipCatalog.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.trino.connector;
-
-import static io.trino.testing.TestingSession.testSessionBuilder;
-import static org.apache.gravitino.Catalog.Type.RELATIONAL;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import io.trino.Session;
-import io.trino.plugin.memory.MemoryPlugin;
-import io.trino.testing.AbstractTestQueryFramework;
-import io.trino.testing.DistributedQueryRunner;
-import io.trino.testing.MaterializedResult;
-import io.trino.testing.QueryRunner;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import org.apache.gravitino.client.GravitinoAdminClient;
-import org.apache.gravitino.client.GravitinoMetalake;
-import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager;
-import org.awaitility.Awaitility;
-import org.junit.jupiter.api.Test;
-
-public class TestGravitinoConnectorWithSkipCatalog extends 
AbstractTestQueryFramework {
-
-  GravitinoMockServer server;
-
-  @Override
-  protected QueryRunner createQueryRunner() throws Exception {
-    GravitinoAdminClient gravitinoClient = initGravitinoMockServer();
-    Session session = testSessionBuilder().setCatalog("gravitino").build();
-
-    try {
-      DistributedQueryRunner queryRunner =
-          DistributedQueryRunner.builder(session).setNodeCount(1).build();
-
-      TestGravitinoPlugin gravitinoPlugin = new 
TestGravitinoPlugin(gravitinoClient);
-      queryRunner.installPlugin(gravitinoPlugin);
-
-      {
-        // create a gravitino connector with single metalake
-        HashMap<String, String> properties = new HashMap<>();
-        properties.put("gravitino.metalake", "test1");
-        properties.put("gravitino.uri", "http://127.0.0.1:8090";);
-        properties.put("gravitino.trino.skip-catalog-patterns", "a.*, b1");
-        properties.put(
-            "catalog.config-dir", 
queryRunner.getCoordinator().getBaseDataDir().toString());
-        properties.put("discovery.uri", 
queryRunner.getCoordinator().getBaseUrl().toString());
-        queryRunner.createCatalog("gravitino", "gravitino", properties);
-      }
-
-      
GravitinoConnectorPluginManager.instance(this.getClass().getClassLoader())
-          .installPlugin("memory", new MemoryPlugin());
-      CatalogConnectorManager catalogConnectorManager =
-          gravitinoPlugin.getCatalogConnectorManager();
-      server.setCatalogConnectorManager(catalogConnectorManager);
-
-      // Wait for the catalog to be created. Wait for at least 30 seconds.
-      Awaitility.await()
-          .atMost(30, TimeUnit.SECONDS)
-          .pollInterval(1, TimeUnit.SECONDS)
-          .until(() -> !catalogConnectorManager.getCatalogs().isEmpty());
-      return queryRunner;
-
-    } catch (Exception e) {
-      throw new RuntimeException("Create query runner failed", e);
-    }
-  }
-
-  @Test
-  public void testShowCatalogsFilteredBySkipPatterns() throws Exception {
-    MaterializedResult expectedResult = computeActual("show catalogs");
-    assertEquals(expectedResult.getMaterializedRows().size(), 3);
-    List<String> catalogs =
-        expectedResult.getMaterializedRows().stream()
-            .map(row -> (String) row.getField(0))
-            .collect(Collectors.toList());
-    assertFalse(catalogs.contains("a1"));
-    assertFalse(catalogs.contains("b1"));
-    assertTrue(catalogs.contains("b2"));
-  }
-
-  @Test
-  public void testSystemTableQueryFilteredBySkipPatterns() throws Exception {
-    MaterializedResult expectedResult = computeActual("select * from 
gravitino.system.catalog");
-    assertEquals(expectedResult.getMaterializedRows().size(), 1);
-    List<String> catalogs =
-        expectedResult.getMaterializedRows().stream()
-            .map(row -> (String) row.getField(0))
-            .collect(Collectors.toList());
-    assertFalse(catalogs.contains("a1"));
-    assertFalse(catalogs.contains("b1"));
-    assertTrue(catalogs.contains("b2"));
-  }
-
-  private GravitinoAdminClient initGravitinoMockServer() {
-    GravitinoMockServer gravitinoMockServer = new GravitinoMockServer();
-    server = closeAfterClass(gravitinoMockServer);
-    GravitinoAdminClient gravitinoClient = server.createGravitinoClient();
-
-    gravitinoClient.createMetalake("test1", "", Collections.emptyMap());
-    GravitinoMetalake metalake = gravitinoClient.loadMetalake("test1");
-    metalake.createCatalog("a1", RELATIONAL, "", "", Collections.emptyMap());
-    metalake.createCatalog("b1", RELATIONAL, "", "", Collections.emptyMap());
-    metalake.createCatalog("b2", RELATIONAL, "", "", Collections.emptyMap());
-    return gravitinoClient;
-  }
-}
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoMetadataGetSystemTable.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoMetadataGetSystemTable.java
index cfd846b05b..e3c4403336 100644
--- 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoMetadataGetSystemTable.java
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoMetadataGetSystemTable.java
@@ -51,7 +51,7 @@ public class TestGravitinoMetadataGetSystemTable {
         .thenReturn(Optional.of(mockSystemTable));
 
     GravitinoMetadata gravitinoMetadata =
-        new GravitinoMetadata(catalogConnectorMetadata, metadataAdapter, 
internalMetadata);
+        new TestGravitinoMetadata(catalogConnectorMetadata, metadataAdapter, 
internalMetadata);
 
     Optional<SystemTable> result = gravitinoMetadata.getSystemTable(session, 
tableName);
 
@@ -73,11 +73,20 @@ public class TestGravitinoMetadataGetSystemTable {
         .thenReturn(Optional.empty());
 
     GravitinoMetadata gravitinoMetadata =
-        new GravitinoMetadata(catalogConnectorMetadata, metadataAdapter, 
internalMetadata);
+        new TestGravitinoMetadata(catalogConnectorMetadata, metadataAdapter, 
internalMetadata);
 
     Optional<SystemTable> result = gravitinoMetadata.getSystemTable(session, 
tableName);
 
     assertFalse(result.isPresent());
     verify(internalMetadata).getSystemTable(session, tableName);
   }
+
+  private static final class TestGravitinoMetadata extends GravitinoMetadata {
+    private TestGravitinoMetadata(
+        CatalogConnectorMetadata catalogConnectorMetadata,
+        CatalogConnectorMetadataAdapter metadataAdapter,
+        ConnectorMetadata internalMetadata) {
+      super(catalogConnectorMetadata, metadataAdapter, internalMetadata);
+    }
+  }
 }
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoPlugin.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoPlugin.java
deleted file mode 100644
index 044deb527f..0000000000
--- 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoPlugin.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.trino.connector;
-
-import com.google.common.collect.ImmutableList;
-import io.trino.spi.connector.ConnectorFactory;
-import org.apache.gravitino.client.GravitinoAdminClient;
-import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager;
-
-public class TestGravitinoPlugin extends GravitinoPlugin {
-  private TestGravitinoConnectorFactory factory;
-
-  private final GravitinoAdminClient gravitinoClient;
-
-  public TestGravitinoPlugin(GravitinoAdminClient gravitinoClient) {
-    this.gravitinoClient = gravitinoClient;
-  }
-
-  @Override
-  public Iterable<ConnectorFactory> getConnectorFactories() {
-    factory = new TestGravitinoConnectorFactory();
-    factory.setGravitinoClient(gravitinoClient);
-    return ImmutableList.of(factory);
-  }
-
-  public CatalogConnectorManager getCatalogConnectorManager() {
-    return factory.getCatalogConnectorManager();
-  }
-}
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/TestCatalogConnectorManager.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/TestCatalogConnectorManager.java
new file mode 100644
index 0000000000..f03cf0a474
--- /dev/null
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/TestCatalogConnectorManager.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector.catalog;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableMap;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ConnectorContext;
+import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.trino.connector.GravitinoConfig;
+import org.apache.gravitino.trino.connector.GravitinoErrorCode;
+import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog;
+import org.junit.jupiter.api.Test;
+
+public class TestCatalogConnectorManager {
+
+  @Test
+  public void testSingleMetalakeCatalogNaming() throws Exception {
+    CatalogConnectorManager manager =
+        createManager(
+            ImmutableMap.of(
+                "gravitino.uri",
+                "http://127.0.0.1:8090";,
+                "gravitino.metalake",
+                "test",
+                "gravitino.use-single-metalake",
+                "true"));
+
+    assertEquals("memory", manager.getTrinoCatalogName("test", "memory"));
+  }
+
+  @Test
+  public void testMultiMetalakeCatalogNaming() throws Exception {
+    CatalogConnectorManager manager =
+        createManager(
+            ImmutableMap.of(
+                "gravitino.uri",
+                "http://127.0.0.1:8090";,
+                "gravitino.metalake",
+                "test",
+                "gravitino.use-single-metalake",
+                "false"));
+
+    assertEquals("\"test.memory\"", manager.getTrinoCatalogName("test", 
"memory"));
+  }
+
+  @Test
+  public void testSingleMetalakeRejectsDifferentMetalakeConnector() throws 
Exception {
+    CatalogConnectorFactory catalogFactory = createCatalogConnectorFactory();
+    CatalogConnectorManager manager =
+        createManager(
+            catalogFactory,
+            ImmutableMap.of(
+                "gravitino.uri",
+                "http://127.0.0.1:8090";,
+                "gravitino.metalake",
+                "test",
+                "gravitino.use-single-metalake",
+                "true"));
+
+    GravitinoConfig connectorConfig = 
createConnectorConfig(catalogConfigJson("test", "memory"));
+    assertDoesNotThrow(
+        () -> manager.createCatalogConnectorContext("test0", connectorConfig, 
mockContext()));
+
+    GravitinoConfig otherConnectorConfig =
+        createConnectorConfig(catalogConfigJson("test2", "memory"));
+    TrinoException error =
+        assertThrows(
+            TrinoException.class,
+            () ->
+                manager.createCatalogConnectorContext(
+                    "test1", otherConnectorConfig, mockContext()));
+    assertEquals(GravitinoErrorCode.GRAVITINO_OPERATION_FAILED.toErrorCode(), 
error.getErrorCode());
+    assertTrue(error.getMessage().contains("Failed to create connector"));
+    assertTrue(error.getCause().getMessage().contains("Multiple metalakes are 
not supported"));
+  }
+
+  @Test
+  public void testMultiMetalakeAllowsDifferentMetalakeConnector() throws 
Exception {
+    CatalogConnectorFactory catalogFactory = createCatalogConnectorFactory();
+    CatalogConnectorManager manager =
+        createManager(
+            catalogFactory,
+            ImmutableMap.of(
+                "gravitino.uri",
+                "http://127.0.0.1:8090";,
+                "gravitino.metalake",
+                "test",
+                "gravitino.use-single-metalake",
+                "false"));
+
+    GravitinoConfig connectorConfig = 
createConnectorConfig(catalogConfigJson("test", "memory"));
+    assertDoesNotThrow(
+        () -> manager.createCatalogConnectorContext("test0", connectorConfig, 
mockContext()));
+
+    GravitinoConfig otherConnectorConfig =
+        createConnectorConfig(catalogConfigJson("test2", "memory"));
+    assertDoesNotThrow(
+        () -> manager.createCatalogConnectorContext("test1", 
otherConnectorConfig, mockContext()));
+  }
+
+  @Test
+  public void testSkipCatalogPatterns() throws Exception {
+    CatalogConnectorManager manager =
+        createManager(
+            ImmutableMap.of(
+                "gravitino.uri",
+                "http://127.0.0.1:8090";,
+                "gravitino.metalake",
+                "test1",
+                "gravitino.trino.skip-catalog-patterns",
+                "a.*, b1"));
+
+    assertTrue(manager.skipCatalog("a1"));
+    assertTrue(manager.skipCatalog("b1"));
+    assertFalse(manager.skipCatalog("b2"));
+  }
+
+  private CatalogConnectorManager createManager(ImmutableMap<String, String> 
configMap)
+      throws Exception {
+    return createManager(createCatalogConnectorFactory(), configMap);
+  }
+
+  private CatalogConnectorManager createManager(
+      CatalogConnectorFactory catalogFactory, ImmutableMap<String, String> 
configMap) {
+    CatalogRegister catalogRegister = mock(CatalogRegister.class);
+
+    boolean singleMetalakeMode =
+        configMap.getOrDefault("gravitino.use-single-metalake", 
"true").equals("true");
+    CatalogConnectorManager manager =
+        new CatalogConnectorManager(
+            catalogRegister,
+            catalogFactory,
+            singleMetalakeMode
+                ? null
+                : (metalake, catalog) -> String.format("\"%s.%s\"", metalake, 
catalog));
+    manager.config(new GravitinoConfig(configMap), 
mock(GravitinoAdminClient.class));
+    return manager;
+  }
+
+  private CatalogConnectorFactory createCatalogConnectorFactory() throws 
Exception {
+    CatalogConnectorFactory catalogFactory = 
mock(CatalogConnectorFactory.class);
+    CatalogConnectorContext.Builder builder = 
mock(CatalogConnectorContext.Builder.class);
+    
when(catalogFactory.createCatalogConnectorContextBuilder(any())).thenReturn(builder);
+    when(builder.withMetalake(any())).thenReturn(builder);
+    when(builder.withContext(any())).thenReturn(builder);
+    when(builder.build()).thenReturn(mock(CatalogConnectorContext.class));
+    return catalogFactory;
+  }
+
+  private static GravitinoConfig createConnectorConfig(String 
catalogConfigJson) {
+    return new GravitinoConfig(
+        ImmutableMap.of(
+            "gravitino.uri",
+            "http://127.0.0.1:8090";,
+            "gravitino.metalake",
+            "test",
+            GravitinoConfig.GRAVITINO_DYNAMIC_CONNECTOR,
+            "true",
+            GravitinoConfig.GRAVITINO_DYNAMIC_CONNECTOR_CATALOG_CONFIG,
+            catalogConfigJson));
+  }
+
+  private static String catalogConfigJson(String metalake, String name) throws 
Exception {
+    GravitinoCatalog catalog =
+        new GravitinoCatalog(metalake, "memory", name, ImmutableMap.of(), 0L);
+    return GravitinoCatalog.toJson(catalog);
+  }
+
+  private static ConnectorContext mockContext() {
+    return mock(ConnectorContext.class);
+  }
+}
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/hive/TestHiveDataTypeConverter.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/hive/TestHiveDataTypeConverter.java
index 7bf5c49048..315d3c8a9a 100644
--- 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/hive/TestHiveDataTypeConverter.java
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/hive/TestHiveDataTypeConverter.java
@@ -22,7 +22,6 @@ package org.apache.gravitino.trino.connector.catalog.hive;
 import io.trino.spi.TrinoException;
 import org.apache.gravitino.rel.types.Types;
 import org.apache.gravitino.trino.connector.util.GeneralDataTypeTransformer;
-import org.junit.Assert;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -43,7 +42,7 @@ public class TestHiveDataTypeConverter {
 
     io.trino.spi.type.Type charLengthIsOverflow = 
io.trino.spi.type.CharType.createCharType(256);
     Exception e =
-        Assert.assertThrows(
+        Assertions.assertThrows(
             TrinoException.class,
             () -> 
generalDataTypeTransformer.getGravitinoType(charLengthIsOverflow));
     Assertions.assertTrue(
@@ -63,7 +62,7 @@ public class TestHiveDataTypeConverter {
     io.trino.spi.type.Type varcharLengthIsOverflow =
         io.trino.spi.type.VarcharType.createVarcharType(65536);
     e =
-        Assert.assertThrows(
+        Assertions.assertThrows(
             TrinoException.class,
             () -> 
generalDataTypeTransformer.getGravitinoType(varcharLengthIsOverflow));
     Assertions.assertTrue(
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/jdbc/postgresql/TestPostgreSQLDataTypeTransformer.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/jdbc/postgresql/TestPostgreSQLDataTypeTransformer.java
index 9afe765eb0..831fce11f7 100644
--- 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/jdbc/postgresql/TestPostgreSQLDataTypeTransformer.java
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/jdbc/postgresql/TestPostgreSQLDataTypeTransformer.java
@@ -22,7 +22,7 @@ package 
org.apache.gravitino.trino.connector.catalog.jdbc.postgresql;
 import org.apache.gravitino.rel.types.Type;
 import org.apache.gravitino.rel.types.Types;
 import org.apache.gravitino.trino.connector.util.GeneralDataTypeTransformer;
-import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 public class TestPostgreSQLDataTypeTransformer {
@@ -31,29 +31,29 @@ public class TestPostgreSQLDataTypeTransformer {
   public void testTrinoTypeToGravitinoType() {
     GeneralDataTypeTransformer generalDataTypeTransformer = new 
PostgreSQLDataTypeTransformer();
     io.trino.spi.type.Type charTypeWithLengthOne = 
io.trino.spi.type.CharType.createCharType(1);
-    Assert.assertEquals(
+    Assertions.assertEquals(
         generalDataTypeTransformer.getGravitinoType(charTypeWithLengthOne),
         Types.FixedCharType.of(1));
 
     io.trino.spi.type.Type charTypeWithLength = 
io.trino.spi.type.CharType.createCharType(65536);
-    Assert.assertEquals(
+    Assertions.assertEquals(
         generalDataTypeTransformer.getGravitinoType(charTypeWithLength),
         Types.FixedCharType.of(65536));
 
     io.trino.spi.type.Type varcharType = 
io.trino.spi.type.VarcharType.createVarcharType(1);
-    Assert.assertEquals(
+    Assertions.assertEquals(
         generalDataTypeTransformer.getGravitinoType(varcharType), 
Types.VarCharType.of(1));
 
     io.trino.spi.type.Type varcharTypeWithLength =
         io.trino.spi.type.VarcharType.createVarcharType(65536);
 
-    Assert.assertEquals(
+    Assertions.assertEquals(
         generalDataTypeTransformer.getGravitinoType(varcharTypeWithLength),
         Types.VarCharType.of(65536));
 
     io.trino.spi.type.Type varcharTypeWithLength2 =
         io.trino.spi.type.VarcharType.createUnboundedVarcharType();
-    Assert.assertEquals(
+    Assertions.assertEquals(
         generalDataTypeTransformer.getGravitinoType(varcharTypeWithLength2),
         Types.StringType.get());
   }
@@ -62,7 +62,7 @@ public class TestPostgreSQLDataTypeTransformer {
   public void testGravitinoCharToTrinoType() {
     GeneralDataTypeTransformer generalDataTypeTransformer = new 
PostgreSQLDataTypeTransformer();
     Type stringType = Types.StringType.get();
-    Assert.assertEquals(
+    Assertions.assertEquals(
         generalDataTypeTransformer.getTrinoType(stringType),
         io.trino.spi.type.VarcharType.createUnboundedVarcharType());
   }
diff --git 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/system/TestGravitinoSystemConnector.java
 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/system/TestGravitinoSystemConnector.java
index 372b9f2b88..11b5538b08 100644
--- 
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/system/TestGravitinoSystemConnector.java
+++ 
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/system/TestGravitinoSystemConnector.java
@@ -21,18 +21,23 @@ package org.apache.gravitino.trino.connector.system;
 import io.trino.spi.Page;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 public class TestGravitinoSystemConnector {
   @Test
   public void testSystemTablePageSourceReturnsPageOnlyOnce() throws Exception {
     Page page = new Page(0);
     try (GravitinoSystemConnector.SystemTablePageSource pageSource =
-        new GravitinoSystemConnector.SystemTablePageSource(page)) {
+        Mockito.mock(
+            GravitinoSystemConnector.SystemTablePageSource.class,
+            Mockito.withSettings()
+                .useConstructor(page)
+                .defaultAnswer(Mockito.CALLS_REAL_METHODS))) {
 
       Assertions.assertFalse(pageSource.isFinished());
-      Assertions.assertSame(page, pageSource.getNextPage());
+      Assertions.assertSame(page, pageSource.nextPage());
       Assertions.assertTrue(pageSource.isFinished());
-      Assertions.assertNull(pageSource.getNextPage());
+      Assertions.assertNull(pageSource.nextPage());
     }
   }
 
@@ -40,18 +45,22 @@ public class TestGravitinoSystemConnector {
   public void testSystemTablePageSourceMultipleGetNextPageCalls() throws 
Exception {
     Page page = new Page(0);
     try (GravitinoSystemConnector.SystemTablePageSource pageSource =
-        new GravitinoSystemConnector.SystemTablePageSource(page)) {
+        Mockito.mock(
+            GravitinoSystemConnector.SystemTablePageSource.class,
+            Mockito.withSettings()
+                .useConstructor(page)
+                .defaultAnswer(Mockito.CALLS_REAL_METHODS))) {
 
       // First call should return the page
-      Page firstPage = pageSource.getNextPage();
+      Page firstPage = pageSource.nextPage();
       Assertions.assertNotNull(firstPage);
       Assertions.assertSame(page, firstPage);
       Assertions.assertTrue(pageSource.isFinished());
 
       // Subsequent calls should return null
-      Assertions.assertNull(pageSource.getNextPage());
-      Assertions.assertNull(pageSource.getNextPage());
-      Assertions.assertNull(pageSource.getNextPage());
+      Assertions.assertNull(pageSource.nextPage());
+      Assertions.assertNull(pageSource.nextPage());
+      Assertions.assertNull(pageSource.nextPage());
       Assertions.assertTrue(pageSource.isFinished());
     }
   }

Reply via email to