This is an automated email from the ASF dual-hosted git repository. yuqi4733 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit 925111bdbfb7ec3cc3a54b3752c9300a56ad43b2 Author: Yuhui <[email protected]> AuthorDate: Thu Jan 29 19:12:17 2026 +0800 [#9718] feat (trino-connector): Split Trino connector to common layer and adapter for Trino SIP changes (#9735) Split Trino connector to common layer and adapter for Trino SIP changes Fix: #9718 NO UTs --- .github/workflows/frontend-integration-test.yml | 19 +- .github/workflows/trino-integration-test.yml | 7 +- trino-connector/trino-connector/build.gradle.kts | 34 +--- .../trino/connector/GravitinoConnector.java | 37 ++-- .../trino/connector/GravitinoConnectorFactory.java | 109 ++++++++++-- .../trino/connector/GravitinoMetadata.java | 54 +----- .../GravitinoNodePartitioningProvider.java | 13 -- .../gravitino/trino/connector/GravitinoPlugin.java | 28 ++- .../gravitino/trino/connector/GravitinoSplit.java | 7 +- .../trino/connector/GravitinoSplitManager.java | 9 +- .../trino/connector/GravitinoSplitSource.java | 9 +- .../connector/catalog/CatalogConnectorContext.java | 11 +- .../connector/catalog/CatalogConnectorManager.java | 61 ++++--- .../catalog/CatalogConnectorMetadata.java | 21 +++ .../trino/connector/catalog/CatalogRegister.java | 67 ++----- .../connector/system/GravitinoSystemConnector.java | 50 ++++-- .../trino/connector/util/json/BlockJsonSerde.java | 13 +- .../connector/AbstractGravitinoConnectorTest.java | 79 +++++++++ .../trino/connector/GravitinoMockServer.java | 56 +++++- .../connector/TestCreateGravitinoConnector.java | 104 ----------- .../trino/connector/TestGravitinoConfig.java | 8 - .../trino/connector/TestGravitinoConnector.java | 112 +++++------- .../connector/TestGravitinoConnectorFactory.java | 35 ---- .../TestGravitinoConnectorNullChecks.java | 31 +++- ...tGravitinoConnectorWithMetalakeCatalogName.java | 123 +++++-------- .../TestGravitinoConnectorWithSkipCatalog.java | 128 ------------- .../TestGravitinoMetadataGetSystemTable.java | 13 +- .../trino/connector/TestGravitinoPlugin.java | 45 ----- .../catalog/TestCatalogConnectorManager.java | 197 +++++++++++++++++++++ .../catalog/hive/TestHiveDataTypeConverter.java | 5 +- .../TestPostgreSQLDataTypeTransformer.java | 14 +- .../system/TestGravitinoSystemConnector.java | 25 ++- 32 files changed, 803 insertions(+), 721 deletions(-) diff --git a/.github/workflows/frontend-integration-test.yml b/.github/workflows/frontend-integration-test.yml index d9f4232bc1..71f788371c 100644 --- a/.github/workflows/frontend-integration-test.yml +++ b/.github/workflows/frontend-integration-test.yml @@ -1,11 +1,8 @@ name: Frontend Integration Test # Controls when the workflow will run -on: - push: - branches: [ "main", "branch-*" ] - pull_request: - branches: [ "main", "branch-*" ] +# Disable it temporarily. +on: {} concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} @@ -37,7 +34,7 @@ jobs: - scripts/** - server/** - server-common/** - - web-v2/** + - web/** - build.gradle.kts - gradle.properties - gradlew @@ -49,7 +46,7 @@ jobs: needs: changes if: needs.changes.outputs.source_changes == 'true' runs-on: ubuntu-latest - timeout-minutes: 90 + timeout-minutes: 60 strategy: matrix: # Integration test for AMD64 architecture @@ -75,7 +72,7 @@ jobs: - name: Package Gravitino run: | - GRAVITINO_USE_WEB_V2=true ./gradlew compileDistribution -x test + ./gradlew compileDistribution -x test - name: Free up disk space run: | @@ -84,8 +81,8 @@ jobs: - name: Frontend Integration Test id: integrationTest run: | - GRAVITINO_USE_WEB_V2=true ./gradlew -PskipTests -PtestMode=embedded -PskipDockerTests=false :web-v2:integration-test:test - GRAVITINO_USE_WEB_V2=true ./gradlew -PskipTests -PtestMode=deploy -PskipDockerTests=false :web-v2:integration-test:test + ./gradlew -PskipTests -PtestMode=embedded -PskipDockerTests=false :web:integration-test:test + ./gradlew -PskipTests -PtestMode=deploy -PskipDockerTests=false :web:integration-test:test - name: Upload integrate tests reports uses: actions/upload-artifact@v4 @@ -99,4 +96,4 @@ jobs: distribution/package/logs/gravitino-server.log catalogs/**/*.log catalogs/**/*.tar - web-v2/integration-test/build/*.log \ No newline at end of file + web/integration-test/build/*.log diff --git a/.github/workflows/trino-integration-test.yml b/.github/workflows/trino-integration-test.yml index 6f80fc7bad..a8097e0337 100644 --- a/.github/workflows/trino-integration-test.yml +++ b/.github/workflows/trino-integration-test.yml @@ -1,11 +1,8 @@ name: Trino Integration Test # Controls when the workflow will run -on: - push: - branches: [ "main", "branch-*" ] - pull_request: - branches: [ "main", "branch-*" ] +# Disable it temporarily. +on: {} concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} diff --git a/trino-connector/trino-connector/build.gradle.kts b/trino-connector/trino-connector/build.gradle.kts index c9bd1d8d6c..39c7a078c0 100644 --- a/trino-connector/trino-connector/build.gradle.kts +++ b/trino-connector/trino-connector/build.gradle.kts @@ -16,8 +16,8 @@ * specific language governing permissions and limitations * under the License. */ + plugins { - `maven-publish` id("java") id("idea") } @@ -26,6 +26,10 @@ repositories { mavenCentral() } +val trinoVersionProvider = + providers.gradleProperty("trinoVersion").map { it.toInt() }.orElse(435) +val trinoVersion = trinoVersionProvider.get() + dependencies { implementation(project(":catalogs:catalog-common")) implementation(project(":clients:client-java-runtime", configuration = "shadow")) @@ -33,40 +37,20 @@ dependencies { implementation(libs.bundles.log4j) implementation(libs.commons.collections4) implementation(libs.commons.lang3) - implementation(libs.trino.jdbc) + implementation("io.trino:trino-jdbc:$trinoVersion") compileOnly(libs.airlift.resolver) - compileOnly(libs.trino.spi) { + compileOnly("io.trino:trino-spi:$trinoVersion") { exclude("org.apache.logging.log4j") } testImplementation(libs.awaitility) testImplementation(libs.mockito.core) testImplementation(libs.mysql.driver) - testImplementation(libs.trino.memory) { + testImplementation("io.trino:trino-memory:$trinoVersion") { exclude("org.antlr") exclude("org.apache.logging.log4j") } - testImplementation(libs.trino.testing) { + testImplementation("io.trino:trino-testing:$trinoVersion") { exclude("org.apache.logging.log4j") } testRuntimeOnly(libs.junit.jupiter.engine) } - -tasks.named("generateMetadataFileForMavenJavaPublication") { - dependsOn(":trino-connector:trino-connector:copyDepends") -} - -tasks { - val copyDepends by registering(Copy::class) { - from(configurations.runtimeClasspath) - into("build/libs") - } - jar { - finalizedBy(copyDepends) - } - - register("copyLibs", Copy::class) { - dependsOn(copyDepends, "build") - from("build/libs") - into("$rootDir/distribution/${rootProject.name}-trino-connector") - } -} diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java index c9b48b50d0..6247be1d4f 100644 --- a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java @@ -18,7 +18,10 @@ */ package org.apache.gravitino.trino.connector; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; + import com.google.common.base.Preconditions; +import io.trino.spi.TrinoException; import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorAccessControl; import io.trino.spi.connector.ConnectorCapabilities; @@ -35,9 +38,9 @@ import io.trino.spi.transaction.IsolationLevel; import java.util.List; import java.util.Set; import org.apache.gravitino.NameIdentifier; -import org.apache.gravitino.client.GravitinoMetalake; import org.apache.gravitino.trino.connector.catalog.CatalogConnectorContext; import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata; +import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter; /** * GravitinoConnector serves as the entry point for operations on the connector managed by Trino and @@ -47,19 +50,20 @@ import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata; public class GravitinoConnector implements Connector { private final NameIdentifier catalogIdentifier; - private final CatalogConnectorContext catalogConnectorContext; + protected final CatalogConnectorContext catalogConnectorContext; + private final CatalogConnectorMetadata connectorMetadata; /** * Constructs a new GravitinoConnector with the specified catalog identifier and catalog connector * context. * - * @param catalogIdentifier the catalog identifier * @param catalogConnectorContext the catalog connector context */ - public GravitinoConnector( - NameIdentifier catalogIdentifier, CatalogConnectorContext catalogConnectorContext) { - this.catalogIdentifier = catalogIdentifier; + public GravitinoConnector(CatalogConnectorContext catalogConnectorContext) { + this.catalogIdentifier = catalogConnectorContext.getCatalog().geNameIdentifier(); this.catalogConnectorContext = catalogConnectorContext; + this.connectorMetadata = + new CatalogConnectorMetadata(catalogConnectorContext.getMetalake(), this.catalogIdentifier); } @Override @@ -86,14 +90,15 @@ public class GravitinoConnector implements Connector { ConnectorMetadata internalMetadata = internalConnector.getMetadata(session, gravitinoTransactionHandle.getInternalHandle()); Preconditions.checkArgument(internalMetadata != null, "Internal metadata must not be null"); + return createGravitinoMetadata( + connectorMetadata, catalogConnectorContext.getMetadataAdapter(), internalMetadata); + } - GravitinoMetalake metalake = catalogConnectorContext.getMetalake(); - - CatalogConnectorMetadata catalogConnectorMetadata = - new CatalogConnectorMetadata(metalake, catalogIdentifier); - - return new GravitinoMetadata( - catalogConnectorMetadata, catalogConnectorContext.getMetadataAdapter(), internalMetadata); + protected GravitinoMetadata createGravitinoMetadata( + CatalogConnectorMetadata catalogConnectorMetadata, + CatalogConnectorMetadataAdapter metadataAdapter, + ConnectorMetadata internalMetadata) { + throw new TrinoException(NOT_SUPPORTED, "Should be overridden in subclass"); } @Override @@ -172,4 +177,10 @@ public class GravitinoConnector implements Connector { internalConnector.getNodePartitioningProvider(); return new GravitinoNodePartitioningProvider(nodePartitioningProvider); } + + public void shutdown() { + Connector internalConnector = catalogConnectorContext.getInternalConnector(); + internalConnector.shutdown(); + catalogConnectorContext.close(); + } } diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnectorFactory.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnectorFactory.java index 2bbcf61e4e..c9fd7e9392 100644 --- a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnectorFactory.java +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnectorFactory.java @@ -18,6 +18,7 @@ */ package org.apache.gravitino.trino.connector; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static org.apache.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_RUNTIME_ERROR; import com.google.common.annotations.VisibleForTesting; @@ -28,9 +29,9 @@ import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorContext; import io.trino.spi.connector.ConnectorFactory; import java.util.Map; -import java.util.function.Supplier; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.client.GravitinoAdminClient; +import org.apache.gravitino.trino.connector.catalog.CatalogConnectorContext; import org.apache.gravitino.trino.connector.catalog.CatalogConnectorFactory; import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager; import org.apache.gravitino.trino.connector.catalog.CatalogRegister; @@ -45,6 +46,8 @@ import org.slf4j.LoggerFactory; public class GravitinoConnectorFactory implements ConnectorFactory { private static final Logger LOG = LoggerFactory.getLogger(GravitinoConnectorFactory.class); + private static final int MIN_SUPPORT_TRINO_SPI_VERSION = 435; + private static final int MAX_SUPPORT_TRINO_SPI_VERSION = Integer.MAX_VALUE; /** The default connector name. */ public static final String DEFAULT_CONNECTOR_NAME = "gravitino"; @@ -53,6 +56,13 @@ public class GravitinoConnectorFactory implements ConnectorFactory { private CatalogConnectorManager catalogConnectorManager; + private GravitinoAdminClient client; + private int trinoVersion; + + public GravitinoConnectorFactory(GravitinoAdminClient client) { + this.client = client; + } + @Override public String getName() { return DEFAULT_CONNECTOR_NAME; @@ -74,25 +84,31 @@ public class GravitinoConnectorFactory implements ConnectorFactory { * * @param catalogName the connector name of catalog * @param requiredConfig the config of connector - * @param context Trino connector context * @return Trino connector */ @Override public Connector create( - String catalogName, Map<String, String> requiredConfig, ConnectorContext context) { + String catalogName, + Map<String, String> requiredConfig, + ConnectorContext trinoConnectorContext) { Preconditions.checkArgument(requiredConfig != null, "requiredConfig is not null"); GravitinoConfig config = new GravitinoConfig(requiredConfig); synchronized (this) { if (catalogConnectorManager == null) { + checkTrinoSpiVersion(trinoConnectorContext, config); try { CatalogRegister catalogRegister = new CatalogRegister(); CatalogConnectorFactory catalogConnectorFactory = createCatalogConnectorFactory(config); catalogConnectorManager = - new CatalogConnectorManager(catalogRegister, catalogConnectorFactory); - catalogConnectorManager.config(config, clientProvider().get()); - catalogConnectorManager.start(context); + new CatalogConnectorManager( + catalogRegister, catalogConnectorFactory, this::getTrinoCatalogName); + catalogConnectorManager.config(config, client); + + if (isCoordinator(trinoConnectorContext)) { + catalogConnectorManager.start(); + } gravitinoSystemTableFactory = new GravitinoSystemTableFactory(catalogConnectorManager); } catch (Exception e) { @@ -106,7 +122,12 @@ public class GravitinoConnectorFactory implements ConnectorFactory { if (config.isDynamicConnector()) { // The dynamic connector is an instance of GravitinoConnector. It is loaded from Gravitino // server. - return catalogConnectorManager.createConnector(catalogName, config, context); + CatalogConnectorContext catalogConnectorContext = + catalogConnectorManager.createCatalogConnectorContext( + catalogName, config, trinoConnectorContext); + GravitinoConnector catalogConnector = createConnector(catalogConnectorContext); + catalogConnectorContext.bindConnector(catalogConnector); + return catalogConnectorContext.getConnector(); } else { // The static connector is an instance of GravitinoSystemConnector. It is loaded by Trino // using the connector configuration. @@ -117,13 +138,75 @@ public class GravitinoConnectorFactory implements ConnectorFactory { } GravitinoStoredProcedureFactory gravitinoStoredProcedureFactory = new GravitinoStoredProcedureFactory(catalogConnectorManager, metalake); - return new GravitinoSystemConnector(gravitinoStoredProcedureFactory); + return createSystemConnector(gravitinoStoredProcedureFactory); } } - @VisibleForTesting - Supplier<GravitinoAdminClient> clientProvider() { - return () -> null; + protected GravitinoConnector createConnector(CatalogConnectorContext connectorContext) { + throw new TrinoException(NOT_SUPPORTED, "Should be overridden in subclass"); + } + + protected GravitinoSystemConnector createSystemConnector( + GravitinoStoredProcedureFactory storedProcedureFactory) { + return new GravitinoSystemConnector(storedProcedureFactory); + } + + protected String getTrinoCatalogName(String metalakeName, String catalogName) { + return "\"" + metalakeName + "." + catalogName + "\""; + } + + private void checkTrinoSpiVersion(ConnectorContext context, GravitinoConfig config) { + String spiVersion = context.getSpiVersion(); + trinoVersion = Integer.parseInt(spiVersion); + + // check catalog name with metalake are supported in this trino version + if (!config.singleMetalakeMode() && !supportCatalogNameWithMetalake()) { + String errmsg = + String.format( + "The trino-connector-%s-%s does not support catalog name with metalake.", + getMinSupportTrinoSpiVersion(), getMaxSupportTrinoSpiVersion()); + throw new TrinoException(GravitinoErrorCode.GRAVITINO_UNSUPPORTED_TRINO_VERSION, errmsg); + } + + // skip version validation + boolean spiVersionCheck = config.isSkipTrinoVersionValidation(); + if (spiVersionCheck) { + if (trinoVersion < getMinSupportTrinoSpiVersion() + || trinoVersion > getMaxSupportTrinoSpiVersion()) { + LOG.warn( + "The version {} has not undergone thorough testing with Gravitino, there may be compatibility problem.", + trinoVersion); + } + return; + } + + // version validation + if (trinoVersion < getMinSupportTrinoSpiVersion() + || trinoVersion > getMaxSupportTrinoSpiVersion()) { + String errmsg = + String.format( + "Unsupported Trino-%s version. The Supported version for the Gravitino-Trino-connector from Trino-%d to Trino-%d." + + "Maybe you can set gravitino.trino.skip-version-validation to skip version validation.", + trinoVersion, getMinSupportTrinoSpiVersion(), getMaxSupportTrinoSpiVersion()); + throw new TrinoException(GravitinoErrorCode.GRAVITINO_UNSUPPORTED_TRINO_VERSION, errmsg); + } + } + + protected boolean supportCatalogNameWithMetalake() { + return true; + } + + protected int getMinSupportTrinoSpiVersion() { + return MIN_SUPPORT_TRINO_SPI_VERSION; + } + + protected int getMaxSupportTrinoSpiVersion() { + return MAX_SUPPORT_TRINO_SPI_VERSION; + } + + @SuppressWarnings("deprecation") + protected boolean isCoordinator(ConnectorContext connectorContext) { + return connectorContext.getNodeManager().getCurrentNode().isCoordinator(); } private CatalogConnectorFactory createCatalogConnectorFactory(GravitinoConfig config) { @@ -145,4 +228,8 @@ public class GravitinoConnectorFactory implements ConnectorFactory { GRAVITINO_RUNTIME_ERROR, "Can not create CatalogConnectorFactory ", e); } } + + public int getTrinoVersion() { + return trinoVersion; + } } diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java index eadb9d5bde..5bd0b0e607 100644 --- a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java @@ -23,7 +23,6 @@ import static org.apache.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import io.airlift.slice.Slice; import io.trino.spi.TrinoException; import io.trino.spi.connector.AggregateFunction; import io.trino.spi.connector.AggregationApplicationResult; @@ -31,9 +30,7 @@ import io.trino.spi.connector.Assignment; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ConnectorInsertTableHandle; -import io.trino.spi.connector.ConnectorMergeTableHandle; import io.trino.spi.connector.ConnectorMetadata; -import io.trino.spi.connector.ConnectorOutputMetadata; import io.trino.spi.connector.ConnectorPartitioningHandle; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorTableHandle; @@ -58,10 +55,8 @@ import io.trino.spi.expression.ConnectorExpression; import io.trino.spi.expression.Constant; import io.trino.spi.security.TrinoPrincipal; import io.trino.spi.statistics.ColumnStatistics; -import io.trino.spi.statistics.ComputedStatistics; import io.trino.spi.statistics.TableStatistics; import io.trino.spi.type.Type; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; @@ -71,7 +66,6 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata; import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter; -import org.apache.gravitino.trino.connector.metadata.GravitinoColumn; import org.apache.gravitino.trino.connector.metadata.GravitinoSchema; import org.apache.gravitino.trino.connector.metadata.GravitinoTable; @@ -80,18 +74,18 @@ import org.apache.gravitino.trino.connector.metadata.GravitinoTable; * server. It also transforms the different metadata formats between Trino and Gravitino. * Additionally, it wraps the internal connector metadata for accessing data. */ -public class GravitinoMetadata implements ConnectorMetadata { +public abstract class GravitinoMetadata implements ConnectorMetadata { // The column handle name that will generate row IDs for the merge operation. public static final String MERGE_ROW_ID = "$row_id"; // Handling metadata operations on gravitino server - private final CatalogConnectorMetadata catalogConnectorMetadata; + protected final CatalogConnectorMetadata catalogConnectorMetadata; // Transform different metadata format - private final CatalogConnectorMetadataAdapter metadataAdapter; + protected final CatalogConnectorMetadataAdapter metadataAdapter; - private final ConnectorMetadata internalMetadata; + protected final ConnectorMetadata internalMetadata; /** * Constructs a new GravitinoMetadata instance. @@ -253,16 +247,6 @@ public class GravitinoMetadata implements ConnectorMetadata { return new GravitinoInsertTableHandle(insertTableHandle); } - @Override - public Optional<ConnectorOutputMetadata> finishInsert( - ConnectorSession session, - ConnectorInsertTableHandle insertHandle, - Collection<Slice> fragments, - Collection<ComputedStatistics> computedStatistics) { - return internalMetadata.finishInsert( - session, GravitinoHandle.unWrap(insertHandle), fragments, computedStatistics); - } - @Override public void renameSchema(ConnectorSession session, String source, String target) { catalogConnectorMetadata.renameSchema(source, target); @@ -293,13 +277,6 @@ public class GravitinoMetadata implements ConnectorMetadata { catalogConnectorMetadata.setTableProperties(getTableName(tableHandle), allProps); } - @Override - public void addColumn( - ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata column) { - GravitinoColumn gravitinoColumn = metadataAdapter.createColumn(column); - catalogConnectorMetadata.addColumn(getTableName(tableHandle), gravitinoColumn); - } - @Override public void dropColumn( ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle column) { @@ -616,27 +593,6 @@ public class GravitinoMetadata implements ConnectorMetadata { return updateLayout.map(GravitinoPartitioningHandle::new); } - @Override - public ConnectorMergeTableHandle beginMerge( - ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode retryMode) { - ConnectorMergeTableHandle connectorMergeTableHandle = - internalMetadata.beginMerge(session, GravitinoHandle.unWrap(tableHandle), retryMode); - SchemaTableName tableName = getTableName(tableHandle); - - return new GravitinoMergeTableHandle( - tableName.getSchemaName(), tableName.getTableName(), connectorMergeTableHandle); - } - - @Override - public void finishMerge( - ConnectorSession session, - ConnectorMergeTableHandle mergeTableHandle, - Collection<Slice> fragments, - Collection<ComputedStatistics> computedStatistics) { - internalMetadata.finishMerge( - session, GravitinoHandle.unWrap(mergeTableHandle), fragments, computedStatistics); - } - @Override public Optional<ConnectorTableHandle> applyUpdate( ConnectorSession session, @@ -702,7 +658,7 @@ public class GravitinoMetadata implements ConnectorMetadata { : new ConnectorTableLayout(result.getPartitionColumns())); } - private SchemaTableName getTableName(ConnectorTableHandle tableHandle) { + protected SchemaTableName getTableName(ConnectorTableHandle tableHandle) { return ((GravitinoTableHandle) tableHandle).toSchemaTableName(); } diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoNodePartitioningProvider.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoNodePartitioningProvider.java index 8226b70d14..1780fbfa11 100644 --- a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoNodePartitioningProvider.java +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoNodePartitioningProvider.java @@ -23,12 +23,10 @@ import io.trino.spi.connector.ConnectorBucketNodeMap; import io.trino.spi.connector.ConnectorNodePartitioningProvider; import io.trino.spi.connector.ConnectorPartitioningHandle; import io.trino.spi.connector.ConnectorSession; -import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.type.Type; import java.util.List; import java.util.Optional; -import java.util.function.ToIntFunction; /** * This class provides a ConnectorNodePartitioningProvider for Trino to decide data partitioning @@ -59,17 +57,6 @@ public class GravitinoNodePartitioningProvider implements ConnectorNodePartition GravitinoHandle.unWrap(partitioningHandle)); } - @Override - public ToIntFunction<ConnectorSplit> getSplitBucketFunction( - ConnectorTransactionHandle transactionHandle, - ConnectorSession session, - ConnectorPartitioningHandle partitioningHandle) { - return nodePartitioningProvider.getSplitBucketFunction( - GravitinoHandle.unWrap(transactionHandle), - session, - GravitinoHandle.unWrap(partitioningHandle)); - } - @Override public BucketFunction getBucketFunction( ConnectorTransactionHandle transactionHandle, diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPlugin.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPlugin.java index 2b4332845c..80b4b96b66 100644 --- a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPlugin.java +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPlugin.java @@ -18,15 +18,41 @@ */ package org.apache.gravitino.trino.connector; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import io.trino.spi.Plugin; import io.trino.spi.connector.ConnectorFactory; +import org.apache.gravitino.client.GravitinoAdminClient; +import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager; /** Trino plugin endpoint, using java spi mechanism */ public class GravitinoPlugin implements Plugin { + private GravitinoConnectorFactory factory; + protected GravitinoAdminClient client; + + public GravitinoPlugin() {} + + public GravitinoPlugin(GravitinoAdminClient client) { + this.client = client; + } @Override public Iterable<ConnectorFactory> getConnectorFactories() { - return ImmutableList.of(new GravitinoConnectorFactory()); + factory = createConnectorFactory(client); + return ImmutableList.of(factory); + } + + protected GravitinoConnectorFactory createConnectorFactory(GravitinoAdminClient client) { + return new GravitinoConnectorFactory(client); + } + + @VisibleForTesting + CatalogConnectorManager getCatalogConnectorManager() { + return factory.getCatalogConnectorManager(); + } + + @VisibleForTesting + int getTrinoVersion() { + return factory.getTrinoVersion(); } } diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplit.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplit.java index 6e020dd103..9d70576cbe 100644 --- a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplit.java +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplit.java @@ -31,7 +31,7 @@ import java.util.List; * The GravitinoFTransactionHandle is used to make Apache Gravitino metadata operations * transactional and wrap the inner connector transaction for data access. */ -public class GravitinoSplit implements ConnectorSplit, GravitinoHandle<ConnectorSplit> { +public abstract class GravitinoSplit implements ConnectorSplit, GravitinoHandle<ConnectorSplit> { private HandleWrapper<ConnectorSplit> handleWrapper = new HandleWrapper<>(ConnectorSplit.class); @@ -75,11 +75,6 @@ public class GravitinoSplit implements ConnectorSplit, GravitinoHandle<Connector return handleWrapper.getHandle().getAddresses(); } - @Override - public Object getInfo() { - return handleWrapper.getHandle().getInfo(); - } - @Override public SplitWeight getSplitWeight() { return handleWrapper.getHandle().getSplitWeight(); diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitManager.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitManager.java index 97f1fcc9ba..d3bdb1ecf0 100644 --- a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitManager.java +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitManager.java @@ -18,6 +18,9 @@ */ package org.apache.gravitino.trino.connector; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; + +import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorSplitSource; @@ -53,6 +56,10 @@ public class GravitinoSplitManager implements ConnectorSplitManager { GravitinoHandle.unWrap(connectorTableHandle), new GravitinoDynamicFilter(dynamicFilter), constraint); - return new GravitinoSplitSource(splits); + return createSplitSource(splits); + } + + protected ConnectorSplitSource createSplitSource(ConnectorSplitSource splits) { + throw new TrinoException(NOT_SUPPORTED, "Should be overridden in subclass"); } } diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitSource.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitSource.java index 9e8af76b5a..aaa3af89f9 100644 --- a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitSource.java +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitSource.java @@ -18,6 +18,9 @@ */ package org.apache.gravitino.trino.connector; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; + +import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.connector.ConnectorSplitSource; import java.util.List; @@ -49,11 +52,15 @@ public class GravitinoSplitSource implements ConnectorSplitSource { .thenApply( batch -> { List<ConnectorSplit> list = - batch.getSplits().stream().map(GravitinoSplit::new).collect(Collectors.toList()); + batch.getSplits().stream().map(this::createSplit).collect(Collectors.toList()); return new ConnectorSplitBatch(list, batch.isNoMoreSplits()); }); } + protected ConnectorSplit createSplit(ConnectorSplit split) { + throw new TrinoException(NOT_SUPPORTED, "Should be overridden in subclass"); + } + @Override public void close() { connectorSplitSource.close(); diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorContext.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorContext.java index 02024b1742..83a8a6124e 100644 --- a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorContext.java +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorContext.java @@ -40,7 +40,7 @@ public class CatalogConnectorContext { private final GravitinoMetalake metalake; // Connector communicates with Trino - private final GravitinoConnector connector; + private GravitinoConnector connector; // Internal connector communicates with data storage private final Connector internalConnector; @@ -64,8 +64,15 @@ public class CatalogConnectorContext { this.metalake = metalake; this.internalConnector = internalConnector; this.adapter = adapter; + } - this.connector = new GravitinoConnector(catalog.geNameIdentifier(), this); + /** + * Binds the Gravitino connector to this context. + * + * @param connector the Gravitino connector + */ + public void bindConnector(GravitinoConnector connector) { + this.connector = connector; } /** diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java index e1b8a7e54d..38f5290464 100644 --- a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java @@ -21,7 +21,6 @@ package org.apache.gravitino.trino.connector.catalog; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.trino.spi.TrinoException; -import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorContext; import java.util.Arrays; import java.util.HashSet; @@ -35,7 +34,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import java.util.stream.Collectors; -import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.Catalog; import org.apache.gravitino.client.GravitinoAdminClient; import org.apache.gravitino.client.GravitinoMetalake; @@ -76,6 +75,7 @@ public class CatalogConnectorManager { private GravitinoAdminClient gravitinoClient; private GravitinoConfig config; + private TrinoCatalogNameHandler trinoCatalogNameHandler; /** * Constructs a new CatalogConnectorManager with the specified catalog register and catalog @@ -85,10 +85,13 @@ public class CatalogConnectorManager { * @param catalogFactory the catalog connector factory */ public CatalogConnectorManager( - CatalogRegister catalogRegister, CatalogConnectorFactory catalogFactory) { + CatalogRegister catalogRegister, + CatalogConnectorFactory catalogFactory, + TrinoCatalogNameHandler trinoCatalogNameHandler) { this.catalogRegister = catalogRegister; this.catalogConnectorFactory = catalogFactory; this.executorService = createScheduledThreadPoolExecutor(); + this.trinoCatalogNameHandler = trinoCatalogNameHandler; } private static ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor() { @@ -127,19 +130,15 @@ public class CatalogConnectorManager { /** * Starts the catalog connector manager with the specified Trino connector context. * - * @param context the Trino connector context * @throws Exception if the catalog connector manager fails to start */ - public void start(ConnectorContext context) throws Exception { - catalogRegister.init(context, config); - if (catalogRegister.isCoordinator()) { - executorService.scheduleWithFixedDelay( - this::loadMetalake, - metadataUpdateIntervalSecond, - metadataUpdateIntervalSecond, - TimeUnit.SECONDS); - } - + public void start() throws Exception { + catalogRegister.init(config); + executorService.scheduleWithFixedDelay( + this::loadMetalake, + metadataUpdateIntervalSecond, + metadataUpdateIntervalSecond, + TimeUnit.SECONDS); LOG.info("Gravitino CatalogConnectorManager started."); } @@ -330,7 +329,15 @@ public class CatalogConnectorManager { /** Shuts down the catalog connector manager. */ public void shutdown() { LOG.info("Gravitino CatalogConnectorManager shutdown."); - throw new NotImplementedException(); + if (catalogRegister != null) { + catalogRegister.close(); + } + + executorService.shutdown(); + + if (gravitinoClient != null) { + gravitinoClient.close(); + } } /** @@ -341,7 +348,9 @@ public class CatalogConnectorManager { * @return the Trino catalog name */ public String getTrinoCatalogName(String metalake, String catalog) { - return config.singleMetalakeMode() ? catalog : String.format("\"%s.%s\"", metalake, catalog); + return config.singleMetalakeMode() + ? catalog + : trinoCatalogNameHandler.getCatalogName(metalake, catalog); } /** @@ -369,14 +378,21 @@ public class CatalogConnectorManager { * @param connectorName the name of the connector * @param config the Gravitino configuration * @param context the Trino connector context - * @return the created connector + * @return the created catalog connector context */ - public Connector createConnector( + public CatalogConnectorContext createCatalogConnectorContext( String connectorName, GravitinoConfig config, ConnectorContext context) { try { String catalogConfig = config.getCatalogConfig(); GravitinoCatalog catalog = GravitinoCatalog.fromJson(catalogConfig); + if (this.config.singleMetalakeMode() + && StringUtils.isNotBlank(targetMetalake) + && !targetMetalake.equals(catalog.getMetalake())) { + throw new TrinoException( + GravitinoErrorCode.GRAVITINO_UNSUPPORTED_OPERATION, + "Multiple metalakes are not supported"); + } CatalogConnectorContext.Builder builder = catalogConnectorFactory.createCatalogConnectorContextBuilder(catalog); builder @@ -384,9 +400,10 @@ public class CatalogConnectorManager { .withContext(context); CatalogConnectorContext connectorContext = builder.build(); - catalogConnectors.put(connectorName, connectorContext); + String fullCatalogName = getTrinoCatalogName(catalog); + catalogConnectors.put(fullCatalogName, connectorContext); LOG.info("Create connector {} successful", connectorName); - return connectorContext.getConnector(); + return connectorContext; } catch (Exception e) { LOG.error("Failed to create connector: {}", connectorName, e); throw new TrinoException( @@ -433,4 +450,8 @@ public class CatalogConnectorManager { } return false; } + + public interface TrinoCatalogNameHandler { + String getCatalogName(String metalake, String catalog); + } } diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java index a66bd2aed6..3e2727ac2b 100644 --- a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java @@ -326,6 +326,27 @@ public class CatalogConnectorMetadata { } } + /** + * Adds a new column to the specified table with a position hint. + * + * @param schemaTableName the name of the schema and table + * @param column the Gravitino column to add + * @param position the target position for the new column + */ + public void addColumn( + SchemaTableName schemaTableName, + GravitinoColumn column, + TableChange.ColumnPosition position) { + String[] columnNames = {column.getName()}; + String comment = Strings.isNullOrEmpty(column.getComment()) ? null : column.getComment(); + TableChange.ColumnPosition targetPosition = + position == null ? TableChange.ColumnPosition.defaultPos() : position; + applyAlter( + schemaTableName, + TableChange.addColumn( + columnNames, column.getType(), comment, targetPosition, column.isNullable())); + } + /** * Drops a column from the specified table. * diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogRegister.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogRegister.java index 5e26bf700d..0b79d1af33 100644 --- a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogRegister.java +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogRegister.java @@ -23,7 +23,6 @@ import static org.apache.gravitino.trino.connector.GravitinoConfig.GRAVITINO_DYN import io.trino.jdbc.TrinoDriver; import io.trino.spi.TrinoException; -import io.trino.spi.connector.ConnectorContext; import java.io.File; import java.nio.file.Files; import java.nio.file.Path; @@ -47,61 +46,14 @@ public class CatalogRegister { private static final Logger LOG = LoggerFactory.getLogger(CatalogRegister.class); - private static final int MIN_SUPPORT_TRINO_SPI_VERSION = 435; - private static final int MAX_SUPPORT_TRINO_SPI_VERSION = 439; - private static final int MIN_SUPPORT_CATALOG_NAME_WITH_METALAKE_TRINO_SPI_VERSION = 446; private static final int EXECUTE_QUERY_MAX_RETRIES = 6; private static final int EXECUTE_QUERY_BACKOFF_TIME_SECOND = 5; - private String trinoVersion; private Connection connection; - private boolean isCoordinator; private boolean isStarted = false; private String catalogStoreDirectory; private GravitinoConfig config; - private void checkTrinoSpiVersion(ConnectorContext context) { - this.trinoVersion = context.getSpiVersion(); - - int version = Integer.parseInt(trinoVersion); - - if (version < MIN_SUPPORT_TRINO_SPI_VERSION || version > MAX_SUPPORT_TRINO_SPI_VERSION) { - Boolean skipTrinoVersionValidation = config.isSkipTrinoVersionValidation(); - if (!skipTrinoVersionValidation) { - String errmsg = - String.format( - "Unsupported Trino-%s version. The Supported version for the Gravitino-Trino-connector from Trino-%d to Trino-%d." - + "Maybe you can set gravitino.trino.skip-version-validation to skip version validation.", - trinoVersion, MIN_SUPPORT_TRINO_SPI_VERSION, MAX_SUPPORT_TRINO_SPI_VERSION); - throw new TrinoException(GravitinoErrorCode.GRAVITINO_UNSUPPORTED_TRINO_VERSION, errmsg); - } else { - LOG.warn( - "The version %s has not undergone thorough testing with Gravitino, there may be compatiablity problem.", - trinoVersion); - } - } - - isCoordinator = context.getNodeManager().getCurrentNode().isCoordinator(); - } - - private void checkSupportCatalogNameWithMetalake( - ConnectorContext context, GravitinoConfig config) { - if (!config.singleMetalakeMode()) { - int version = Integer.parseInt(context.getSpiVersion()); - if (version < MIN_SUPPORT_CATALOG_NAME_WITH_METALAKE_TRINO_SPI_VERSION) { - LOG.warn( - "Trino-{} does not support catalog name with dots, The minimal required version is Trino-{}." - + "Some errors may occur when using the USE <CATALOG>.<SCHEMA> statement in Trino", - trinoVersion, - MIN_SUPPORT_CATALOG_NAME_WITH_METALAKE_TRINO_SPI_VERSION); - } - } - } - - boolean isCoordinator() { - return isCoordinator; - } - boolean isTrinoStarted() { if (isStarted) { return true; @@ -121,14 +73,11 @@ public class CatalogRegister { * Initializes the catalog register with the specified Trino connector context and Gravitino * configuration. * - * @param context the Trino connector context * @param config the Gravitino configuration * @throws Exception if the catalog register fails to initialize */ - public void init(ConnectorContext context, GravitinoConfig config) throws Exception { + public void init(GravitinoConfig config) throws Exception { this.config = config; - checkTrinoSpiVersion(context); - checkSupportCatalogNameWithMetalake(context, config); TrinoDriver driver = new TrinoDriver(); DriverManager.registerDriver(driver); @@ -203,6 +152,8 @@ public class CatalogRegister { String createCatalogCommand = generateCreateCatalogCommand(name, catalog); executeSql(createCatalogCommand); LOG.info("Register catalog {} successfully: {}", name, createCatalogCommand); + } catch (SQLException e) { + throw new TrinoException(GravitinoErrorCode.GRAVITINO_RUNTIME_ERROR, e.getMessage(), e); } catch (Exception e) { String message = String.format("Failed to register catalog %s", name); LOG.error(message); @@ -211,7 +162,7 @@ public class CatalogRegister { } private boolean checkCatalogExist(String name) { - String showCatalogCommand = String.format("SHOW CATALOGS like '%s'", name); + String showCatalogCommand = "SHOW CATALOGS"; Exception failedException = null; try { int retries = EXECUTE_QUERY_MAX_RETRIES; @@ -222,11 +173,17 @@ public class CatalogRegister { ResultSet rs = statement.getResultSet(); while (rs.next()) { String catalogName = rs.getString(1); - if (catalogName.equals(name) || catalogName.equals("\"" + name + "\"")) { + // In some Trino version catalog name may be quoted, so we need to check both quoted and + // unquoted names + if (name.equals(catalogName) + || name.equals("\"" + catalogName + "\"") + || ("\"" + name + "\"").equals(catalogName)) { return true; } } return false; + } catch (SQLException e) { + throw e; } catch (Exception e) { failedException = e; LOG.warn("Execute command failed: {}, ", showCatalogCommand, e); @@ -252,6 +209,8 @@ public class CatalogRegister { // check the catalog is already created statement.execute(sql); return; + } catch (SQLException e) { + throw e; } catch (Exception e) { failedException = e; LOG.warn("Execute command failed: {}, ", sql, e); diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/system/GravitinoSystemConnector.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/system/GravitinoSystemConnector.java index c6b894170c..57ee95c0ea 100644 --- a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/system/GravitinoSystemConnector.java +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/system/GravitinoSystemConnector.java @@ -18,10 +18,13 @@ */ package org.apache.gravitino.trino.connector.system; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; + import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.trino.spi.HostAddress; import io.trino.spi.Page; +import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorMetadata; @@ -79,16 +82,30 @@ public class GravitinoSystemConnector implements Connector { @Override public ConnectorMetadata getMetadata( ConnectorSession session, ConnectorTransactionHandle transactionHandle) { + return createMetadata(); + } + + protected ConnectorMetadata createMetadata() { return new GravitinoSystemConnectorMetadata(); } @Override public ConnectorSplitManager getSplitManager() { - return new SplitManager(); + return createSplitManager(); } @Override public ConnectorPageSourceProvider getPageSourceProvider() { + return createPageSourceProvider(); + } + + public void shutdown() {} + + protected ConnectorSplitManager createSplitManager() { + return new SplitManager(); + } + + protected ConnectorPageSourceProvider createPageSourceProvider() { return new DatasourceProvider(); } @@ -112,7 +129,11 @@ public class GravitinoSystemConnector implements Connector { SchemaTableName tableName = ((GravitinoSystemConnectorMetadata.SystemTableHandle) table).getName(); - return new SystemTablePageSource(GravitinoSystemTableFactory.loadPageData(tableName)); + return createPageSource(GravitinoSystemTableFactory.loadPageData(tableName)); + } + + protected ConnectorPageSource createPageSource(Page page) { + throw new TrinoException(NOT_SUPPORTED, "Should be overridden in subclass"); } } @@ -129,13 +150,18 @@ public class GravitinoSystemConnector implements Connector { SchemaTableName tableName = ((GravitinoSystemConnectorMetadata.SystemTableHandle) connectorTableHandle).getName(); - return new FixedSplitSource(new Split(tableName)); + return new FixedSplitSource(createSplit(tableName)); + } + + protected ConnectorSplit createSplit(SchemaTableName tableName) { + throw new TrinoException(NOT_SUPPORTED, "Should be overridden in subclass"); } } /** The split. */ - public static class Split implements ConnectorSplit { - private final SchemaTableName tableName; + public abstract static class Split implements ConnectorSplit { + + protected final SchemaTableName tableName; /** * Constructs a new Split with the specified table name. @@ -166,18 +192,13 @@ public class GravitinoSystemConnector implements Connector { public List<HostAddress> getAddresses() { return Collections.emptyList(); } - - @Override - public Object getInfo() { - return this; - } } /** The system table page source. */ - public static class SystemTablePageSource implements ConnectorPageSource { + public abstract static class SystemTablePageSource implements ConnectorPageSource { - private boolean isFinished = false; - private final Page page; + protected boolean isFinished = false; + protected final Page page; /** * Constructs a new SystemTablePageSource. @@ -203,8 +224,7 @@ public class GravitinoSystemConnector implements Connector { return isFinished; } - @Override - public Page getNextPage() { + public Page nextPage() { if (isFinished) { return null; } diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/BlockJsonSerde.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/BlockJsonSerde.java index 2ed83b1be8..e100665668 100644 --- a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/BlockJsonSerde.java +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/BlockJsonSerde.java @@ -37,6 +37,7 @@ import java.lang.reflect.Method; */ public final class BlockJsonSerde { private static final String BLOCK_SERDE_UTIL_CLASS_NAME = "io.trino.block.BlockSerdeUtil"; + private static final int DEFAULT_BLOCK_ENCODING_NAME_LENGTH = 64; /** * Jackson serializer for Trino {@link Block} objects. Serializes block instances using Trino's @@ -65,11 +66,13 @@ public final class BlockJsonSerde { public void serialize( Block block, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { - // Encoding name is length prefixed as are many block encodings - SliceOutput output = - new DynamicSliceOutput( - toIntExact( - block.getSizeInBytes() + block.getEncodingName().length() + (2 * Integer.BYTES))); + // estimate size to avoid multiple resizes, encoding name is length prefixed as are many block + // like : SIZE_OF_INT + blockEncoding.getName().length() + block.getSizeInBytes(); + // we use a default length for encoding name to avoid calculating actual length + int estimatedSize = + toIntExact( + block.getSizeInBytes() + (2 * Integer.BYTES) + DEFAULT_BLOCK_ENCODING_NAME_LENGTH); + SliceOutput output = new DynamicSliceOutput(estimatedSize); try { writeBlock.invoke(null, blockEncodingSerde, output, block); diff --git a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/AbstractGravitinoConnectorTest.java b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/AbstractGravitinoConnectorTest.java new file mode 100644 index 0000000000..ac0d223cb3 --- /dev/null +++ b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/AbstractGravitinoConnectorTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.trino.connector; + +import io.trino.plugin.memory.MemoryPlugin; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; +import java.util.concurrent.TimeUnit; +import org.apache.gravitino.client.GravitinoAdminClient; +import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager; +import org.awaitility.Awaitility; + +abstract class AbstractGravitinoConnectorTest extends AbstractTestQueryFramework { + + GravitinoMockServer server; + int trinoVersion; + + @Override + protected QueryRunner createQueryRunner() throws Exception { + GravitinoAdminClient gravitinoClient = createGravitinoClient(); + try { + + DistributedQueryRunner queryRunner = createTrinoQueryRunner(); + + GravitinoPlugin gravitinoPlugin = createGravitinoPlugin(gravitinoClient); + queryRunner.installPlugin(gravitinoPlugin); + + configureCatalogs(queryRunner, gravitinoClient); + + GravitinoConnectorPluginManager.instance(this.getClass().getClassLoader()) + .installPlugin("memory", new MemoryPlugin()); + CatalogConnectorManager catalogConnectorManager = + gravitinoPlugin.getCatalogConnectorManager(); + trinoVersion = gravitinoPlugin.getTrinoVersion(); + server.setCatalogConnectorManager(catalogConnectorManager, trinoVersion); + + // Wait for the catalog to be created. Wait for at least 30 seconds. + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .until(() -> !catalogConnectorManager.getCatalogs().isEmpty()); + + return queryRunner; + } catch (Exception e) { + throw new RuntimeException("Create query runner failed", e); + } + } + + protected GravitinoPlugin createGravitinoPlugin(GravitinoAdminClient client) { + return new GravitinoPlugin(client); + } + + protected abstract DistributedQueryRunner createTrinoQueryRunner() throws Exception; + + protected GravitinoAdminClient createGravitinoClient() { + server = closeAfterClass(new GravitinoMockServer()); + return server.createGravitinoClient(); + } + + protected abstract void configureCatalogs( + DistributedQueryRunner queryRunner, GravitinoAdminClient gravitinoClient) throws Exception; +} diff --git a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/GravitinoMockServer.java b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/GravitinoMockServer.java index b76eea855f..2f8097f88b 100644 --- a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/GravitinoMockServer.java +++ b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/GravitinoMockServer.java @@ -19,6 +19,7 @@ package org.apache.gravitino.trino.connector; import static java.util.Collections.emptyMap; +import static org.apache.gravitino.trino.connector.TestGravitinoConnector.SPI_VERSION_SUPPORT_ADD_COLUMN_WITH_POSITION; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyMap; @@ -30,7 +31,9 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import io.trino.plugin.memory.MemoryConnector; import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTableMetadata; import io.trino.spi.connector.SaveMode; @@ -79,19 +82,32 @@ public class GravitinoMockServer implements AutoCloseable { private boolean start = true; CatalogConnectorManager catalogConnectorManager; private GeneralDataTypeTransformer dataTypeTransformer = new HiveDataTypeTransformer(); + private int trinoVersion = 0; public GravitinoMockServer() { createMetalake(testMetalake); createCatalog(testMetalake, testCatalog, ImmutableMap.of()); } - public void setCatalogConnectorManager(CatalogConnectorManager catalogConnectorManager) { + public void setCatalogConnectorManager( + CatalogConnectorManager catalogConnectorManager, int trinoVersion) { this.catalogConnectorManager = catalogConnectorManager; + this.trinoVersion = trinoVersion; } public GravitinoAdminClient createGravitinoClient() { GravitinoAdminClient client = mock(GravitinoAdminClient.class); + when(client.listMetalakes()) + .thenAnswer( + new Answer<GravitinoMetalake[]>() { + @Override + public GravitinoMetalake[] answer(InvocationOnMock invocation) throws Throwable { + return metalakes.values().stream() + .map(metalake -> metalake.metalake) + .toArray(GravitinoMetalake[]::new); + } + }); when(client.createMetalake(anyString(), anyString(), anyMap())) .thenAnswer( new Answer<GravitinoMetalake>() { @@ -558,7 +574,7 @@ public class GravitinoMockServer implements AutoCloseable { catalogConnectorManager .getCatalogConnector(catalogConnectorManager.getTrinoCatalogName(catalog)) .getMetadataAdapter(); - metadata.addColumn(null, tableHandle, metadataAdapter.getColumnMetadata(column)); + addColumn(metadata, tableHandle, metadataAdapter.getColumnMetadata(column)); } else if (tableChange instanceof TableChange.DeleteColumn) { TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) tableChange; @@ -603,6 +619,42 @@ public class GravitinoMockServer implements AutoCloseable { } } + private void addColumn( + ConnectorMetadata metadata, ConnectorTableHandle tableHandle, ColumnMetadata columnMetadata) { + if (trinoVersion < SPI_VERSION_SUPPORT_ADD_COLUMN_WITH_POSITION) { + try { + metadata + .getClass() + .getMethod( + "addColumn", + ConnectorSession.class, + ConnectorTableHandle.class, + ColumnMetadata.class) + .invoke(metadata, null, tableHandle, columnMetadata); + } catch (ReflectiveOperationException e) { + throw new RuntimeException("Failed to invoke legacy addColumn by reflection", e); + } + } else { + try { + Class<?> columnPositionClass = Class.forName("io.trino.spi.connector.ColumnPosition"); + Class<?> lastPositionClass = Class.forName("io.trino.spi.connector.ColumnPosition$Last"); + Object position = lastPositionClass.getDeclaredConstructor().newInstance(); + + metadata + .getClass() + .getMethod( + "addColumn", + ConnectorSession.class, + ConnectorTableHandle.class, + ColumnMetadata.class, + columnPositionClass) + .invoke(metadata, null, tableHandle, columnMetadata, position); + } catch (Exception e) { + throw new RuntimeException("Failed to invoke addColumn with ColumnPosition", e); + } + } + } + @ResourcePresence public boolean isRunning() { return start; diff --git a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestCreateGravitinoConnector.java b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestCreateGravitinoConnector.java deleted file mode 100644 index 2845d3c604..0000000000 --- a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestCreateGravitinoConnector.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.gravitino.trino.connector; - -import static io.trino.testing.TestingSession.testSessionBuilder; -import static org.assertj.core.api.Assertions.assertThat; - -import io.trino.Session; -import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.QueryRunner; -import java.util.HashMap; -import org.apache.gravitino.client.GravitinoAdminClient; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; - -@Disabled -public class TestCreateGravitinoConnector { - - GravitinoMockServer server; - - @Test - public void testCreateConnectorsWithEnableSimpleCatalog() throws Exception { - server = new GravitinoMockServer(); - Session session = testSessionBuilder().setCatalog("gravitino").build(); - QueryRunner queryRunner = DistributedQueryRunner.builder(session).setNodeCount(1).build(); - - GravitinoAdminClient gravitinoClient = server.createGravitinoClient(); - TestGravitinoPlugin gravitinoPlugin = new TestGravitinoPlugin(gravitinoClient); - queryRunner.installPlugin(gravitinoPlugin); - - // test create two connector and set gravitino.simplify-catalog-names = true - { - // create a gravitino connector named gravitino using metalake test - HashMap<String, String> properties = new HashMap<>(); - properties.put("gravitino.metalake", "test"); - properties.put("gravitino.uri", "http://127.0.0.1:8090"); - properties.put("gravitino.simplify-catalog-names", "true"); - queryRunner.createCatalog("test0", "gravitino", properties); - } - - { - // Test failed to create catalog with different metalake - HashMap<String, String> properties = new HashMap<>(); - properties.put("gravitino.metalake", "test1"); - properties.put("gravitino.uri", "http://127.0.0.1:8090"); - properties.put("gravitino.simplify-catalog-names", "true"); - try { - queryRunner.createCatalog("test1", "gravitino", properties); - } catch (Exception e) { - assertThat(e.getMessage()).contains("Multiple metalakes are not supported"); - } - } - - server.close(); - } - - @Test - public void testCreateConnectorsWithDisableSimpleCatalog() throws Exception { - server = new GravitinoMockServer(); - Session session = testSessionBuilder().setCatalog("gravitino").build(); - QueryRunner queryRunner = DistributedQueryRunner.builder(session).setNodeCount(1).build(); - - GravitinoAdminClient gravitinoClient = server.createGravitinoClient(); - TestGravitinoPlugin gravitinoPlugin = new TestGravitinoPlugin(gravitinoClient); - queryRunner.installPlugin(gravitinoPlugin); - - // test create two connector and set gravitino.simplify-catalog-names = false - { - // create a gravitino connector named gravitino using metalake test - HashMap<String, String> properties = new HashMap<>(); - properties.put("gravitino.metalake", "test"); - properties.put("gravitino.uri", "http://127.0.0.1:8090"); - properties.put("gravitino.simplify-catalog-names", "false"); - queryRunner.createCatalog("test0", "gravitino", properties); - } - - { - // Test failed to create catalog with different metalake - HashMap<String, String> properties = new HashMap<>(); - properties.put("gravitino.metalake", "test1"); - properties.put("gravitino.uri", "http://127.0.0.1:8090"); - properties.put("gravitino.simplify-catalog-names", "false"); - queryRunner.createCatalog("test1", "gravitino", properties); - } - - server.close(); - } -} diff --git a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConfig.java b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConfig.java index e4a8c9e5fe..f79fa1bfe2 100644 --- a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConfig.java +++ b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConfig.java @@ -28,18 +28,10 @@ import com.google.common.collect.ImmutableMap; import io.trino.spi.TrinoException; import java.util.Map; import java.util.regex.Pattern; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.jupiter.api.Test; public class TestGravitinoConfig { - @BeforeClass - public static void startup() throws Exception {} - - @AfterClass - public static void shutdown() throws Exception {} - @Test public void testGravitinoConfig() { String gravitinoUrl = "http://127.0.0.1:8000"; diff --git a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnector.java b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnector.java index 30ac739009..99a7f803ac 100644 --- a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnector.java +++ b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnector.java @@ -18,72 +18,37 @@ */ package org.apache.gravitino.trino.connector; -import static io.trino.testing.TestingSession.testSessionBuilder; import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import com.google.common.base.Preconditions; -import io.trino.Session; -import io.trino.plugin.memory.MemoryPlugin; -import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.MaterializedResult; import io.trino.testing.MaterializedRow; -import io.trino.testing.QueryRunner; import java.util.HashMap; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.gravitino.client.GravitinoAdminClient; -import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager; -import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; -public class TestGravitinoConnector extends AbstractTestQueryFramework { +public abstract class TestGravitinoConnector extends AbstractGravitinoConnectorTest { - GravitinoMockServer server; + public static final int SPI_VERSION_TEST_SUPPORT_RENAME_COLUMN = 452; + public static final int SPI_VERSION_SUPPORT_ADD_COLUMN_WITH_POSITION = 468; + public static final int SPI_VERSION_TEST_SUPPORT_ADD_COLUMN = 469; @Override - protected QueryRunner createQueryRunner() throws Exception { - server = closeAfterClass(new GravitinoMockServer()); - GravitinoAdminClient gravitinoClient = server.createGravitinoClient(); - - Session session = testSessionBuilder().setCatalog("gravitino").build(); - try { - - DistributedQueryRunner queryRunner = - DistributedQueryRunner.builder(session).setNodeCount(1).build(); - - TestGravitinoPlugin gravitinoPlugin = new TestGravitinoPlugin(gravitinoClient); - queryRunner.installPlugin(gravitinoPlugin); - - // create a gravitino connector named gravitino using metalake test - HashMap<String, String> properties = new HashMap<>(); - properties.put("gravitino.metalake", "test"); - properties.put("gravitino.uri", "http://127.0.0.1:8090"); - properties.put( - "catalog.config-dir", queryRunner.getCoordinator().getBaseDataDir().toString()); - properties.put("discovery.uri", queryRunner.getCoordinator().getBaseUrl().toString()); - queryRunner.createCatalog("gravitino", "gravitino", properties); - - GravitinoConnectorPluginManager.instance(this.getClass().getClassLoader()) - .installPlugin("memory", new MemoryPlugin()); - CatalogConnectorManager catalogConnectorManager = - gravitinoPlugin.getCatalogConnectorManager(); - server.setCatalogConnectorManager(catalogConnectorManager); - - // Wait for the catalog to be created. Wait for at least 30 seconds. - Awaitility.await() - .atMost(30, TimeUnit.SECONDS) - .pollInterval(1, TimeUnit.SECONDS) - .until(() -> !catalogConnectorManager.getCatalogs().isEmpty()); - - return queryRunner; - } catch (Exception e) { - throw new RuntimeException("Create query runner failed", e); - } + protected void configureCatalogs( + DistributedQueryRunner queryRunner, GravitinoAdminClient gravitinoClient) { + // create a gravitino connector named gravitino using metalake test + HashMap<String, String> properties = new HashMap<>(); + properties.put("gravitino.metalake", "test"); + properties.put("gravitino.uri", "http://127.0.0.1:8090"); + properties.put("catalog.config-dir", queryRunner.getCoordinator().getBaseDataDir().toString()); + properties.put("discovery.uri", queryRunner.getCoordinator().getBaseUrl().toString()); + queryRunner.createCatalog("gravitino", "gravitino", properties); } @Test @@ -189,38 +154,51 @@ public class TestGravitinoConnector extends AbstractTestQueryFramework { createTestTable(fullTableName1); - // test add column and drop column, but the memory connector is not supported these operations. - assertQueryFails( - String.format("alter table %s add column if not exists c varchar", fullTableName1), - format("This connector does not support adding columns")); - - assertQueryFails( - String.format("alter table %s drop column a", fullTableName1), - format("This connector does not support dropping columns")); - // test set table comment assertUpdate(String.format("comment on table %s is 'test table comments'", fullTableName1)); assertThat((String) computeScalar("show create table " + fullTableName1)) .contains("COMMENT 'test table comments'"); - // test rename column, but the memory connector is not supported these operations. - assertQueryFails( - String.format("alter table %s rename column a to c ", fullTableName1), - format("This connector does not support renaming columns")); - - assertQueryFails( - String.format("alter table %s alter column a set DATA TYPE int", fullTableName1), - format("This connector does not support setting column types")); - // test set column comment assertUpdate(String.format("comment on column %s.a is 'test column comments'", fullTableName1)); assertThat((String) computeScalar("show create table " + fullTableName1)) .contains("COMMENT 'test column comments'"); + // test add column and drop column, but the memory connector is not supported these operations. + if (trinoVersion < SPI_VERSION_TEST_SUPPORT_ADD_COLUMN) { + assertQueryFails( + String.format("alter table %s add column if not exists c varchar", fullTableName1), + "This connector does not support adding columns"); + } else { + assertUpdate( + String.format("alter table %s add column if not exists c varchar", fullTableName1)); + assertThat((String) computeScalar("show create table " + fullTableName1)) + .contains("c varchar"); + } + + assertQueryFails( + String.format("alter table %s drop column a", fullTableName1), + "This connector does not support dropping columns"); + + // test rename column, but the memory connector is not supported these operations. + if (trinoVersion < SPI_VERSION_TEST_SUPPORT_RENAME_COLUMN) { + assertQueryFails( + String.format("alter table %s rename column b to d ", fullTableName1), + "This connector does not support renaming columns"); + } else { + assertUpdate(String.format("alter table %s rename column b to d ", fullTableName1)); + assertThat((String) computeScalar("show create table " + fullTableName1)) + .contains("d integer"); + } + + assertQueryFails( + String.format("alter table %s alter column a set DATA TYPE int", fullTableName1), + "This connector does not support setting column types"); + // test set table properties, but the memory connector is not supported these operations. assertQueryFails( String.format("alter table %s set properties \"max_ttl\" = 20", fullTableName1), - format("This connector does not support setting table properties")); + "This connector does not support setting table properties"); dropTestTable(fullTableName1); } diff --git a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorFactory.java b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorFactory.java deleted file mode 100644 index 8b802d53b0..0000000000 --- a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.gravitino.trino.connector; - -import java.util.function.Supplier; -import org.apache.gravitino.client.GravitinoAdminClient; - -public class TestGravitinoConnectorFactory extends GravitinoConnectorFactory { - private GravitinoAdminClient gravitinoClient; - - public void setGravitinoClient(GravitinoAdminClient client) { - this.gravitinoClient = client; - } - - @Override - Supplier<GravitinoAdminClient> clientProvider() { - return () -> gravitinoClient; - } -} diff --git a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorNullChecks.java b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorNullChecks.java index bf69565b88..1c1bbcb8b6 100644 --- a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorNullChecks.java +++ b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorNullChecks.java @@ -26,16 +26,27 @@ import static org.mockito.Mockito.when; import io.trino.spi.connector.Connector; import io.trino.spi.transaction.IsolationLevel; +import org.apache.gravitino.Catalog; import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.SupportsSchemas; +import org.apache.gravitino.client.GravitinoMetalake; +import org.apache.gravitino.rel.TableCatalog; import org.apache.gravitino.trino.connector.catalog.CatalogConnectorContext; +import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog; import org.junit.jupiter.api.Test; class TestGravitinoConnectorNullChecks { @Test void testBeginTransactionThrowsIfInternalConnectorIsNull() { + GravitinoCatalog mockCatalog = mock(GravitinoCatalog.class); + when(mockCatalog.geNameIdentifier()).thenReturn(NameIdentifier.of("metalake", "catalog")); + CatalogConnectorContext mockContext = mock(CatalogConnectorContext.class); + when(mockContext.getCatalog()).thenReturn(mockCatalog); + GravitinoMetalake metalake = mockMetalake(); + when(mockContext.getMetalake()).thenReturn(metalake); when(mockContext.getInternalConnector()).thenReturn(null); - GravitinoConnector connector = new GravitinoConnector(mock(NameIdentifier.class), mockContext); + GravitinoConnector connector = new GravitinoConnector(mockContext); assertThrows( IllegalArgumentException.class, () -> connector.beginTransaction(IsolationLevel.READ_COMMITTED, true, true)); @@ -43,14 +54,30 @@ class TestGravitinoConnectorNullChecks { @Test void testBeginTransactionThrowsIfInternalTransactionHandleIsNull() { + GravitinoCatalog mockCatalog = mock(GravitinoCatalog.class); + when(mockCatalog.geNameIdentifier()).thenReturn(NameIdentifier.of("metalake", "catalog")); + CatalogConnectorContext mockContext = mock(CatalogConnectorContext.class); + when(mockContext.getCatalog()).thenReturn(mockCatalog); + GravitinoMetalake metalake = mockMetalake(); + when(mockContext.getMetalake()).thenReturn(metalake); + Connector mockInternalConnector = mock(Connector.class); when(mockContext.getInternalConnector()).thenReturn(mockInternalConnector); when(mockInternalConnector.beginTransaction(any(), anyBoolean(), anyBoolean())) .thenReturn(null); - GravitinoConnector connector = new GravitinoConnector(mock(NameIdentifier.class), mockContext); + GravitinoConnector connector = new GravitinoConnector(mockContext); assertThrows( IllegalArgumentException.class, () -> connector.beginTransaction(IsolationLevel.READ_COMMITTED, true, true)); } + + private static GravitinoMetalake mockMetalake() { + GravitinoMetalake metalake = mock(GravitinoMetalake.class); + Catalog catalog = mock(Catalog.class); + when(catalog.asSchemas()).thenReturn(mock(SupportsSchemas.class)); + when(catalog.asTableCatalog()).thenReturn(mock(TableCatalog.class)); + when(metalake.loadCatalog(any())).thenReturn(catalog); + return metalake; + } } diff --git a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithMetalakeCatalogName.java b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithMetalakeCatalogName.java index c60c5910b8..b1cef8e4cc 100644 --- a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithMetalakeCatalogName.java +++ b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithMetalakeCatalogName.java @@ -18,91 +18,43 @@ */ package org.apache.gravitino.trino.connector; -import static io.trino.testing.TestingSession.testSessionBuilder; import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; -import io.trino.Session; -import io.trino.plugin.memory.MemoryPlugin; -import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.MaterializedResult; import io.trino.testing.MaterializedRow; -import io.trino.testing.QueryRunner; import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.concurrent.TimeUnit; import org.apache.gravitino.client.GravitinoAdminClient; -import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager; -import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -@Disabled -public class TestGravitinoConnectorWithMetalakeCatalogName extends AbstractTestQueryFramework { - - GravitinoMockServer server; +public abstract class TestGravitinoConnectorWithMetalakeCatalogName + extends AbstractGravitinoConnectorTest { @Override - protected QueryRunner createQueryRunner() throws Exception { - server = closeAfterClass(new GravitinoMockServer()); - GravitinoAdminClient gravitinoClient = server.createGravitinoClient(); - - Session session = testSessionBuilder().setCatalog("gravitino").build(); - - try { - DistributedQueryRunner queryRunner = - DistributedQueryRunner.builder(session).setNodeCount(1).build(); - - TestGravitinoPlugin gravitinoPlugin = new TestGravitinoPlugin(gravitinoClient); - queryRunner.installPlugin(gravitinoPlugin); - - { - // create a gravitino connector named gravitino using metalake test - HashMap<String, String> properties = new HashMap<>(); - properties.put("gravitino.metalake", "test"); - properties.put("gravitino.uri", "http://127.0.0.1:8090"); - properties.put("gravitino.simplify-catalog-names", "false"); - properties.put( - "trino.catalog.store", queryRunner.getCoordinator().getBaseDataDir().toString()); - properties.put( - "trino.jdbc.uri", - queryRunner.getCoordinator().getBaseUrl().toString().replace("http", "jdbc:trino")); - queryRunner.createCatalog("gravitino", "gravitino", properties); - } - - { - // create a gravitino connector named test1 using metalake gravitino1 - HashMap<String, String> properties = new HashMap<>(); - properties.put("gravitino.metalake", "test1"); - properties.put("gravitino.uri", "http://127.0.0.1:8090"); - properties.put("gravitino.simplify-catalog-names", "false"); - properties.put( - "trino.catalog.store", queryRunner.getCoordinator().getBaseDataDir().toString()); - properties.put( - "trino.jdbc.uri", - queryRunner.getCoordinator().getBaseUrl().toString().replace("http", "jdbc:trino")); - queryRunner.createCatalog("gravitino1", "gravitino", properties); - } - - GravitinoConnectorPluginManager.instance(this.getClass().getClassLoader()) - .installPlugin("memory", new MemoryPlugin()); - CatalogConnectorManager catalogConnectorManager = - gravitinoPlugin.getCatalogConnectorManager(); - server.setCatalogConnectorManager(catalogConnectorManager); - - // Wait for the catalog to be created. Wait for at least 30 seconds. - Awaitility.await() - .atMost(30, TimeUnit.SECONDS) - .pollInterval(1, TimeUnit.SECONDS) - .until(() -> !catalogConnectorManager.getCatalogs().isEmpty()); - return queryRunner; - - } catch (Exception e) { - throw new RuntimeException("Create query runner failed", e); - } + protected void configureCatalogs( + DistributedQueryRunner queryRunner, GravitinoAdminClient gravitinoClient) { + // create a gravitino connector named gravitino using metalake test + HashMap<String, String> properties = new HashMap<>(); + properties.put("gravitino.metalake", "test"); + properties.put("gravitino.uri", "http://127.0.0.1:8090"); + properties.put("gravitino.use-single-metalake", "false"); + properties.put("catalog.config-dir", queryRunner.getCoordinator().getBaseDataDir().toString()); + properties.put("discovery.uri", queryRunner.getCoordinator().getBaseUrl().toString()); + queryRunner.createCatalog("gravitino", "gravitino", properties); + + // create a gravitino connector named test1 using metalake gravitino1 + HashMap<String, String> secondProperties = new HashMap<>(); + secondProperties.put("gravitino.metalake", "test1"); + secondProperties.put("gravitino.uri", "http://127.0.0.1:8090"); + secondProperties.put("gravitino.use-single-metalake", "false"); + secondProperties.put( + "catalog.config-dir", queryRunner.getCoordinator().getBaseDataDir().toString()); + secondProperties.put("discovery.uri", queryRunner.getCoordinator().getBaseUrl().toString()); + queryRunner.createCatalog("gravitino1", "gravitino", secondProperties); } @Test @@ -121,42 +73,49 @@ public class TestGravitinoConnectorWithMetalakeCatalogName extends AbstractTestQ // testing the catalogs assertThat(computeActual("show catalogs").getOnlyColumnAsSet()).contains("gravitino"); assertThat(computeActual("show catalogs").getOnlyColumnAsSet()).contains("gravitino1"); - assertThat(computeActual("show catalogs").getOnlyColumnAsSet()).contains("\"test.memory\""); + assertThat(computeActual("show catalogs").getOnlyColumnAsSet()) + .contains(getTrinoCliCatalogName("test", "memory")); // testing the gravitino connector framework works. assertThat(computeActual("select * from system.jdbc.tables").getRowCount()).isGreaterThan(1); // test metalake named test. the connector name is gravitino assertUpdate("call gravitino.system.create_catalog('memory1', 'memory', Map())"); - assertThat(computeActual("show catalogs").getOnlyColumnAsSet()).contains("\"test.memory1\""); + assertThat(computeActual("show catalogs").getOnlyColumnAsSet()) + .contains(getTrinoCliCatalogName("test", "memory1")); assertUpdate("call gravitino.system.drop_catalog('memory1')"); assertThat(computeActual("show catalogs").getOnlyColumnAsSet()) - .doesNotContain("\"test.memory1\""); + .doesNotContain(getTrinoCliCatalogName("test", "memory1")); assertUpdate( "call gravitino.system.create_catalog(" + "catalog=>'memory1', provider=>'memory', properties => Map(array['max_ttl'], array['10']), ignore_exist => true)"); - assertThat(computeActual("show catalogs").getOnlyColumnAsSet()).contains("\"test.memory1\""); + assertThat(computeActual("show catalogs").getOnlyColumnAsSet()) + .contains(getTrinoCliCatalogName("test", "memory1")); assertUpdate( "call gravitino.system.drop_catalog(catalog => 'memory1', ignore_not_exist => true)"); assertThat(computeActual("show catalogs").getOnlyColumnAsSet()) - .doesNotContain("\"test.memory1\""); + .doesNotContain(getTrinoCliCatalogName("test", "memory1")); // test metalake named test1. the connector name is gravitino1 GravitinoAdminClient gravitinoClient = server.createGravitinoClient(); gravitinoClient.createMetalake("test1", "", Collections.emptyMap()); assertUpdate("call gravitino1.system.create_catalog('memory1', 'memory', Map())"); - assertThat(computeActual("show catalogs").getOnlyColumnAsSet()).contains("\"test1.memory1\""); + assertThat(computeActual("show catalogs").getOnlyColumnAsSet()) + .contains(getTrinoCliCatalogName("test1", "memory1")); assertUpdate("call gravitino1.system.drop_catalog('memory1')"); assertThat(computeActual("show catalogs").getOnlyColumnAsSet()) - .doesNotContain("\"test1.memory1\""); + .doesNotContain(getTrinoCliCatalogName("test1", "memory1")); } @Test public void testCreateTable() { - String fullSchemaName = "\"test.memory\".db_01"; + assertThat(computeActual("show catalogs").getOnlyColumnAsSet()) + .contains(getTrinoCliCatalogName("test", "memory")); + + String fullSchemaName = getTrinoSqlCatalogName("test", "memory") + ".db_01"; String tableName = "tb_01"; String fullTableName = fullSchemaName + "." + tableName; @@ -178,4 +137,12 @@ public class TestGravitinoConnectorWithMetalakeCatalogName extends AbstractTestQ assertUpdate("drop table " + fullTableName); assertUpdate("drop schema " + fullSchemaName); } + + protected String getTrinoCliCatalogName(String metalakeName, String catalogName) { + return "\"" + metalakeName + "." + catalogName + "\""; + } + + protected String getTrinoSqlCatalogName(String metalakeName, String catalogName) { + return "\"\"\"" + metalakeName + "." + catalogName + "\"\"\""; + } } diff --git a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithSkipCatalog.java b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithSkipCatalog.java deleted file mode 100644 index c3ad8f63c4..0000000000 --- a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithSkipCatalog.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.gravitino.trino.connector; - -import static io.trino.testing.TestingSession.testSessionBuilder; -import static org.apache.gravitino.Catalog.Type.RELATIONAL; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import io.trino.Session; -import io.trino.plugin.memory.MemoryPlugin; -import io.trino.testing.AbstractTestQueryFramework; -import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.MaterializedResult; -import io.trino.testing.QueryRunner; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import org.apache.gravitino.client.GravitinoAdminClient; -import org.apache.gravitino.client.GravitinoMetalake; -import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.Test; - -public class TestGravitinoConnectorWithSkipCatalog extends AbstractTestQueryFramework { - - GravitinoMockServer server; - - @Override - protected QueryRunner createQueryRunner() throws Exception { - GravitinoAdminClient gravitinoClient = initGravitinoMockServer(); - Session session = testSessionBuilder().setCatalog("gravitino").build(); - - try { - DistributedQueryRunner queryRunner = - DistributedQueryRunner.builder(session).setNodeCount(1).build(); - - TestGravitinoPlugin gravitinoPlugin = new TestGravitinoPlugin(gravitinoClient); - queryRunner.installPlugin(gravitinoPlugin); - - { - // create a gravitino connector with single metalake - HashMap<String, String> properties = new HashMap<>(); - properties.put("gravitino.metalake", "test1"); - properties.put("gravitino.uri", "http://127.0.0.1:8090"); - properties.put("gravitino.trino.skip-catalog-patterns", "a.*, b1"); - properties.put( - "catalog.config-dir", queryRunner.getCoordinator().getBaseDataDir().toString()); - properties.put("discovery.uri", queryRunner.getCoordinator().getBaseUrl().toString()); - queryRunner.createCatalog("gravitino", "gravitino", properties); - } - - GravitinoConnectorPluginManager.instance(this.getClass().getClassLoader()) - .installPlugin("memory", new MemoryPlugin()); - CatalogConnectorManager catalogConnectorManager = - gravitinoPlugin.getCatalogConnectorManager(); - server.setCatalogConnectorManager(catalogConnectorManager); - - // Wait for the catalog to be created. Wait for at least 30 seconds. - Awaitility.await() - .atMost(30, TimeUnit.SECONDS) - .pollInterval(1, TimeUnit.SECONDS) - .until(() -> !catalogConnectorManager.getCatalogs().isEmpty()); - return queryRunner; - - } catch (Exception e) { - throw new RuntimeException("Create query runner failed", e); - } - } - - @Test - public void testShowCatalogsFilteredBySkipPatterns() throws Exception { - MaterializedResult expectedResult = computeActual("show catalogs"); - assertEquals(expectedResult.getMaterializedRows().size(), 3); - List<String> catalogs = - expectedResult.getMaterializedRows().stream() - .map(row -> (String) row.getField(0)) - .collect(Collectors.toList()); - assertFalse(catalogs.contains("a1")); - assertFalse(catalogs.contains("b1")); - assertTrue(catalogs.contains("b2")); - } - - @Test - public void testSystemTableQueryFilteredBySkipPatterns() throws Exception { - MaterializedResult expectedResult = computeActual("select * from gravitino.system.catalog"); - assertEquals(expectedResult.getMaterializedRows().size(), 1); - List<String> catalogs = - expectedResult.getMaterializedRows().stream() - .map(row -> (String) row.getField(0)) - .collect(Collectors.toList()); - assertFalse(catalogs.contains("a1")); - assertFalse(catalogs.contains("b1")); - assertTrue(catalogs.contains("b2")); - } - - private GravitinoAdminClient initGravitinoMockServer() { - GravitinoMockServer gravitinoMockServer = new GravitinoMockServer(); - server = closeAfterClass(gravitinoMockServer); - GravitinoAdminClient gravitinoClient = server.createGravitinoClient(); - - gravitinoClient.createMetalake("test1", "", Collections.emptyMap()); - GravitinoMetalake metalake = gravitinoClient.loadMetalake("test1"); - metalake.createCatalog("a1", RELATIONAL, "", "", Collections.emptyMap()); - metalake.createCatalog("b1", RELATIONAL, "", "", Collections.emptyMap()); - metalake.createCatalog("b2", RELATIONAL, "", "", Collections.emptyMap()); - return gravitinoClient; - } -} diff --git a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoMetadataGetSystemTable.java b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoMetadataGetSystemTable.java index cfd846b05b..e3c4403336 100644 --- a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoMetadataGetSystemTable.java +++ b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoMetadataGetSystemTable.java @@ -51,7 +51,7 @@ public class TestGravitinoMetadataGetSystemTable { .thenReturn(Optional.of(mockSystemTable)); GravitinoMetadata gravitinoMetadata = - new GravitinoMetadata(catalogConnectorMetadata, metadataAdapter, internalMetadata); + new TestGravitinoMetadata(catalogConnectorMetadata, metadataAdapter, internalMetadata); Optional<SystemTable> result = gravitinoMetadata.getSystemTable(session, tableName); @@ -73,11 +73,20 @@ public class TestGravitinoMetadataGetSystemTable { .thenReturn(Optional.empty()); GravitinoMetadata gravitinoMetadata = - new GravitinoMetadata(catalogConnectorMetadata, metadataAdapter, internalMetadata); + new TestGravitinoMetadata(catalogConnectorMetadata, metadataAdapter, internalMetadata); Optional<SystemTable> result = gravitinoMetadata.getSystemTable(session, tableName); assertFalse(result.isPresent()); verify(internalMetadata).getSystemTable(session, tableName); } + + private static final class TestGravitinoMetadata extends GravitinoMetadata { + private TestGravitinoMetadata( + CatalogConnectorMetadata catalogConnectorMetadata, + CatalogConnectorMetadataAdapter metadataAdapter, + ConnectorMetadata internalMetadata) { + super(catalogConnectorMetadata, metadataAdapter, internalMetadata); + } + } } diff --git a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoPlugin.java b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoPlugin.java deleted file mode 100644 index 044deb527f..0000000000 --- a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoPlugin.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.gravitino.trino.connector; - -import com.google.common.collect.ImmutableList; -import io.trino.spi.connector.ConnectorFactory; -import org.apache.gravitino.client.GravitinoAdminClient; -import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager; - -public class TestGravitinoPlugin extends GravitinoPlugin { - private TestGravitinoConnectorFactory factory; - - private final GravitinoAdminClient gravitinoClient; - - public TestGravitinoPlugin(GravitinoAdminClient gravitinoClient) { - this.gravitinoClient = gravitinoClient; - } - - @Override - public Iterable<ConnectorFactory> getConnectorFactories() { - factory = new TestGravitinoConnectorFactory(); - factory.setGravitinoClient(gravitinoClient); - return ImmutableList.of(factory); - } - - public CatalogConnectorManager getCatalogConnectorManager() { - return factory.getCatalogConnectorManager(); - } -} diff --git a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/TestCatalogConnectorManager.java b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/TestCatalogConnectorManager.java new file mode 100644 index 0000000000..f03cf0a474 --- /dev/null +++ b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/TestCatalogConnectorManager.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.trino.connector.catalog; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorContext; +import org.apache.gravitino.client.GravitinoAdminClient; +import org.apache.gravitino.trino.connector.GravitinoConfig; +import org.apache.gravitino.trino.connector.GravitinoErrorCode; +import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog; +import org.junit.jupiter.api.Test; + +public class TestCatalogConnectorManager { + + @Test + public void testSingleMetalakeCatalogNaming() throws Exception { + CatalogConnectorManager manager = + createManager( + ImmutableMap.of( + "gravitino.uri", + "http://127.0.0.1:8090", + "gravitino.metalake", + "test", + "gravitino.use-single-metalake", + "true")); + + assertEquals("memory", manager.getTrinoCatalogName("test", "memory")); + } + + @Test + public void testMultiMetalakeCatalogNaming() throws Exception { + CatalogConnectorManager manager = + createManager( + ImmutableMap.of( + "gravitino.uri", + "http://127.0.0.1:8090", + "gravitino.metalake", + "test", + "gravitino.use-single-metalake", + "false")); + + assertEquals("\"test.memory\"", manager.getTrinoCatalogName("test", "memory")); + } + + @Test + public void testSingleMetalakeRejectsDifferentMetalakeConnector() throws Exception { + CatalogConnectorFactory catalogFactory = createCatalogConnectorFactory(); + CatalogConnectorManager manager = + createManager( + catalogFactory, + ImmutableMap.of( + "gravitino.uri", + "http://127.0.0.1:8090", + "gravitino.metalake", + "test", + "gravitino.use-single-metalake", + "true")); + + GravitinoConfig connectorConfig = createConnectorConfig(catalogConfigJson("test", "memory")); + assertDoesNotThrow( + () -> manager.createCatalogConnectorContext("test0", connectorConfig, mockContext())); + + GravitinoConfig otherConnectorConfig = + createConnectorConfig(catalogConfigJson("test2", "memory")); + TrinoException error = + assertThrows( + TrinoException.class, + () -> + manager.createCatalogConnectorContext( + "test1", otherConnectorConfig, mockContext())); + assertEquals(GravitinoErrorCode.GRAVITINO_OPERATION_FAILED.toErrorCode(), error.getErrorCode()); + assertTrue(error.getMessage().contains("Failed to create connector")); + assertTrue(error.getCause().getMessage().contains("Multiple metalakes are not supported")); + } + + @Test + public void testMultiMetalakeAllowsDifferentMetalakeConnector() throws Exception { + CatalogConnectorFactory catalogFactory = createCatalogConnectorFactory(); + CatalogConnectorManager manager = + createManager( + catalogFactory, + ImmutableMap.of( + "gravitino.uri", + "http://127.0.0.1:8090", + "gravitino.metalake", + "test", + "gravitino.use-single-metalake", + "false")); + + GravitinoConfig connectorConfig = createConnectorConfig(catalogConfigJson("test", "memory")); + assertDoesNotThrow( + () -> manager.createCatalogConnectorContext("test0", connectorConfig, mockContext())); + + GravitinoConfig otherConnectorConfig = + createConnectorConfig(catalogConfigJson("test2", "memory")); + assertDoesNotThrow( + () -> manager.createCatalogConnectorContext("test1", otherConnectorConfig, mockContext())); + } + + @Test + public void testSkipCatalogPatterns() throws Exception { + CatalogConnectorManager manager = + createManager( + ImmutableMap.of( + "gravitino.uri", + "http://127.0.0.1:8090", + "gravitino.metalake", + "test1", + "gravitino.trino.skip-catalog-patterns", + "a.*, b1")); + + assertTrue(manager.skipCatalog("a1")); + assertTrue(manager.skipCatalog("b1")); + assertFalse(manager.skipCatalog("b2")); + } + + private CatalogConnectorManager createManager(ImmutableMap<String, String> configMap) + throws Exception { + return createManager(createCatalogConnectorFactory(), configMap); + } + + private CatalogConnectorManager createManager( + CatalogConnectorFactory catalogFactory, ImmutableMap<String, String> configMap) { + CatalogRegister catalogRegister = mock(CatalogRegister.class); + + boolean singleMetalakeMode = + configMap.getOrDefault("gravitino.use-single-metalake", "true").equals("true"); + CatalogConnectorManager manager = + new CatalogConnectorManager( + catalogRegister, + catalogFactory, + singleMetalakeMode + ? null + : (metalake, catalog) -> String.format("\"%s.%s\"", metalake, catalog)); + manager.config(new GravitinoConfig(configMap), mock(GravitinoAdminClient.class)); + return manager; + } + + private CatalogConnectorFactory createCatalogConnectorFactory() throws Exception { + CatalogConnectorFactory catalogFactory = mock(CatalogConnectorFactory.class); + CatalogConnectorContext.Builder builder = mock(CatalogConnectorContext.Builder.class); + when(catalogFactory.createCatalogConnectorContextBuilder(any())).thenReturn(builder); + when(builder.withMetalake(any())).thenReturn(builder); + when(builder.withContext(any())).thenReturn(builder); + when(builder.build()).thenReturn(mock(CatalogConnectorContext.class)); + return catalogFactory; + } + + private static GravitinoConfig createConnectorConfig(String catalogConfigJson) { + return new GravitinoConfig( + ImmutableMap.of( + "gravitino.uri", + "http://127.0.0.1:8090", + "gravitino.metalake", + "test", + GravitinoConfig.GRAVITINO_DYNAMIC_CONNECTOR, + "true", + GravitinoConfig.GRAVITINO_DYNAMIC_CONNECTOR_CATALOG_CONFIG, + catalogConfigJson)); + } + + private static String catalogConfigJson(String metalake, String name) throws Exception { + GravitinoCatalog catalog = + new GravitinoCatalog(metalake, "memory", name, ImmutableMap.of(), 0L); + return GravitinoCatalog.toJson(catalog); + } + + private static ConnectorContext mockContext() { + return mock(ConnectorContext.class); + } +} diff --git a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/hive/TestHiveDataTypeConverter.java b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/hive/TestHiveDataTypeConverter.java index 7bf5c49048..315d3c8a9a 100644 --- a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/hive/TestHiveDataTypeConverter.java +++ b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/hive/TestHiveDataTypeConverter.java @@ -22,7 +22,6 @@ package org.apache.gravitino.trino.connector.catalog.hive; import io.trino.spi.TrinoException; import org.apache.gravitino.rel.types.Types; import org.apache.gravitino.trino.connector.util.GeneralDataTypeTransformer; -import org.junit.Assert; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -43,7 +42,7 @@ public class TestHiveDataTypeConverter { io.trino.spi.type.Type charLengthIsOverflow = io.trino.spi.type.CharType.createCharType(256); Exception e = - Assert.assertThrows( + Assertions.assertThrows( TrinoException.class, () -> generalDataTypeTransformer.getGravitinoType(charLengthIsOverflow)); Assertions.assertTrue( @@ -63,7 +62,7 @@ public class TestHiveDataTypeConverter { io.trino.spi.type.Type varcharLengthIsOverflow = io.trino.spi.type.VarcharType.createVarcharType(65536); e = - Assert.assertThrows( + Assertions.assertThrows( TrinoException.class, () -> generalDataTypeTransformer.getGravitinoType(varcharLengthIsOverflow)); Assertions.assertTrue( diff --git a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/jdbc/postgresql/TestPostgreSQLDataTypeTransformer.java b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/jdbc/postgresql/TestPostgreSQLDataTypeTransformer.java index 9afe765eb0..831fce11f7 100644 --- a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/jdbc/postgresql/TestPostgreSQLDataTypeTransformer.java +++ b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/jdbc/postgresql/TestPostgreSQLDataTypeTransformer.java @@ -22,7 +22,7 @@ package org.apache.gravitino.trino.connector.catalog.jdbc.postgresql; import org.apache.gravitino.rel.types.Type; import org.apache.gravitino.rel.types.Types; import org.apache.gravitino.trino.connector.util.GeneralDataTypeTransformer; -import org.junit.Assert; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; public class TestPostgreSQLDataTypeTransformer { @@ -31,29 +31,29 @@ public class TestPostgreSQLDataTypeTransformer { public void testTrinoTypeToGravitinoType() { GeneralDataTypeTransformer generalDataTypeTransformer = new PostgreSQLDataTypeTransformer(); io.trino.spi.type.Type charTypeWithLengthOne = io.trino.spi.type.CharType.createCharType(1); - Assert.assertEquals( + Assertions.assertEquals( generalDataTypeTransformer.getGravitinoType(charTypeWithLengthOne), Types.FixedCharType.of(1)); io.trino.spi.type.Type charTypeWithLength = io.trino.spi.type.CharType.createCharType(65536); - Assert.assertEquals( + Assertions.assertEquals( generalDataTypeTransformer.getGravitinoType(charTypeWithLength), Types.FixedCharType.of(65536)); io.trino.spi.type.Type varcharType = io.trino.spi.type.VarcharType.createVarcharType(1); - Assert.assertEquals( + Assertions.assertEquals( generalDataTypeTransformer.getGravitinoType(varcharType), Types.VarCharType.of(1)); io.trino.spi.type.Type varcharTypeWithLength = io.trino.spi.type.VarcharType.createVarcharType(65536); - Assert.assertEquals( + Assertions.assertEquals( generalDataTypeTransformer.getGravitinoType(varcharTypeWithLength), Types.VarCharType.of(65536)); io.trino.spi.type.Type varcharTypeWithLength2 = io.trino.spi.type.VarcharType.createUnboundedVarcharType(); - Assert.assertEquals( + Assertions.assertEquals( generalDataTypeTransformer.getGravitinoType(varcharTypeWithLength2), Types.StringType.get()); } @@ -62,7 +62,7 @@ public class TestPostgreSQLDataTypeTransformer { public void testGravitinoCharToTrinoType() { GeneralDataTypeTransformer generalDataTypeTransformer = new PostgreSQLDataTypeTransformer(); Type stringType = Types.StringType.get(); - Assert.assertEquals( + Assertions.assertEquals( generalDataTypeTransformer.getTrinoType(stringType), io.trino.spi.type.VarcharType.createUnboundedVarcharType()); } diff --git a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/system/TestGravitinoSystemConnector.java b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/system/TestGravitinoSystemConnector.java index 372b9f2b88..11b5538b08 100644 --- a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/system/TestGravitinoSystemConnector.java +++ b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/system/TestGravitinoSystemConnector.java @@ -21,18 +21,23 @@ package org.apache.gravitino.trino.connector.system; import io.trino.spi.Page; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; public class TestGravitinoSystemConnector { @Test public void testSystemTablePageSourceReturnsPageOnlyOnce() throws Exception { Page page = new Page(0); try (GravitinoSystemConnector.SystemTablePageSource pageSource = - new GravitinoSystemConnector.SystemTablePageSource(page)) { + Mockito.mock( + GravitinoSystemConnector.SystemTablePageSource.class, + Mockito.withSettings() + .useConstructor(page) + .defaultAnswer(Mockito.CALLS_REAL_METHODS))) { Assertions.assertFalse(pageSource.isFinished()); - Assertions.assertSame(page, pageSource.getNextPage()); + Assertions.assertSame(page, pageSource.nextPage()); Assertions.assertTrue(pageSource.isFinished()); - Assertions.assertNull(pageSource.getNextPage()); + Assertions.assertNull(pageSource.nextPage()); } } @@ -40,18 +45,22 @@ public class TestGravitinoSystemConnector { public void testSystemTablePageSourceMultipleGetNextPageCalls() throws Exception { Page page = new Page(0); try (GravitinoSystemConnector.SystemTablePageSource pageSource = - new GravitinoSystemConnector.SystemTablePageSource(page)) { + Mockito.mock( + GravitinoSystemConnector.SystemTablePageSource.class, + Mockito.withSettings() + .useConstructor(page) + .defaultAnswer(Mockito.CALLS_REAL_METHODS))) { // First call should return the page - Page firstPage = pageSource.getNextPage(); + Page firstPage = pageSource.nextPage(); Assertions.assertNotNull(firstPage); Assertions.assertSame(page, firstPage); Assertions.assertTrue(pageSource.isFinished()); // Subsequent calls should return null - Assertions.assertNull(pageSource.getNextPage()); - Assertions.assertNull(pageSource.getNextPage()); - Assertions.assertNull(pageSource.getNextPage()); + Assertions.assertNull(pageSource.nextPage()); + Assertions.assertNull(pageSource.nextPage()); + Assertions.assertNull(pageSource.nextPage()); Assertions.assertTrue(pageSource.isFinished()); } }
