This is an automated email from the ASF dual-hosted git repository.
yuqi4733 pushed a commit to branch branch-trino-smv
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-trino-smv by this push:
new fb88c95ccb [#9718] feat (trino-connector): Split Trino connector to
common layer and adapter for Trino SIP changes (#9735)
fb88c95ccb is described below
commit fb88c95ccb3083663a35f6d1a3c8a570f327b12a
Author: Yuhui <[email protected]>
AuthorDate: Thu Jan 29 19:12:17 2026 +0800
[#9718] feat (trino-connector): Split Trino connector to common layer and
adapter for Trino SIP changes (#9735)
### What changes were proposed in this pull request?
Split Trino connector to common layer and adapter for Trino SIP changes
### Why are the changes needed?
Fix: #9718
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
UTs
---
.github/workflows/frontend-integration-test.yml | 9 +-
.github/workflows/trino-integration-test.yml | 7 +-
trino-connector/trino-connector/build.gradle.kts | 34 +---
.../trino/connector/GravitinoConnector.java | 37 ++--
.../trino/connector/GravitinoConnectorFactory.java | 109 ++++++++++--
.../trino/connector/GravitinoMetadata.java | 54 +-----
.../GravitinoNodePartitioningProvider.java | 13 --
.../gravitino/trino/connector/GravitinoPlugin.java | 28 ++-
.../gravitino/trino/connector/GravitinoSplit.java | 7 +-
.../trino/connector/GravitinoSplitManager.java | 9 +-
.../trino/connector/GravitinoSplitSource.java | 9 +-
.../connector/catalog/CatalogConnectorContext.java | 11 +-
.../connector/catalog/CatalogConnectorManager.java | 61 ++++---
.../catalog/CatalogConnectorMetadata.java | 21 +++
.../trino/connector/catalog/CatalogRegister.java | 67 ++-----
.../connector/system/GravitinoSystemConnector.java | 50 ++++--
.../trino/connector/util/json/BlockJsonSerde.java | 13 +-
.../connector/AbstractGravitinoConnectorTest.java | 79 +++++++++
.../trino/connector/GravitinoMockServer.java | 56 +++++-
.../connector/TestCreateGravitinoConnector.java | 104 -----------
.../trino/connector/TestGravitinoConfig.java | 8 -
.../trino/connector/TestGravitinoConnector.java | 112 +++++-------
.../connector/TestGravitinoConnectorFactory.java | 35 ----
.../TestGravitinoConnectorNullChecks.java | 31 +++-
...tGravitinoConnectorWithMetalakeCatalogName.java | 123 +++++--------
.../TestGravitinoConnectorWithSkipCatalog.java | 128 -------------
.../TestGravitinoMetadataGetSystemTable.java | 13 +-
.../trino/connector/TestGravitinoPlugin.java | 45 -----
.../catalog/TestCatalogConnectorManager.java | 197 +++++++++++++++++++++
.../catalog/hive/TestHiveDataTypeConverter.java | 5 +-
.../TestPostgreSQLDataTypeTransformer.java | 14 +-
.../system/TestGravitinoSystemConnector.java | 25 ++-
32 files changed, 798 insertions(+), 716 deletions(-)
diff --git a/.github/workflows/frontend-integration-test.yml
b/.github/workflows/frontend-integration-test.yml
index 841b4f390f..71f788371c 100644
--- a/.github/workflows/frontend-integration-test.yml
+++ b/.github/workflows/frontend-integration-test.yml
@@ -1,11 +1,8 @@
name: Frontend Integration Test
# Controls when the workflow will run
-on:
- push:
- branches: [ "main", "branch-*" ]
- pull_request:
- branches: [ "main", "branch-*" ]
+# Disable it temporarily.
+on: {}
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number ||
github.ref }}
@@ -99,4 +96,4 @@ jobs:
distribution/package/logs/gravitino-server.log
catalogs/**/*.log
catalogs/**/*.tar
- web/integration-test/build/*.log
\ No newline at end of file
+ web/integration-test/build/*.log
diff --git a/.github/workflows/trino-integration-test.yml
b/.github/workflows/trino-integration-test.yml
index 6f80fc7bad..a8097e0337 100644
--- a/.github/workflows/trino-integration-test.yml
+++ b/.github/workflows/trino-integration-test.yml
@@ -1,11 +1,8 @@
name: Trino Integration Test
# Controls when the workflow will run
-on:
- push:
- branches: [ "main", "branch-*" ]
- pull_request:
- branches: [ "main", "branch-*" ]
+# Disable it temporarily.
+on: {}
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number ||
github.ref }}
diff --git a/trino-connector/trino-connector/build.gradle.kts
b/trino-connector/trino-connector/build.gradle.kts
index c9bd1d8d6c..39c7a078c0 100644
--- a/trino-connector/trino-connector/build.gradle.kts
+++ b/trino-connector/trino-connector/build.gradle.kts
@@ -16,8 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
+
plugins {
- `maven-publish`
id("java")
id("idea")
}
@@ -26,6 +26,10 @@ repositories {
mavenCentral()
}
+val trinoVersionProvider =
+ providers.gradleProperty("trinoVersion").map { it.toInt() }.orElse(435)
+val trinoVersion = trinoVersionProvider.get()
+
dependencies {
implementation(project(":catalogs:catalog-common"))
implementation(project(":clients:client-java-runtime", configuration =
"shadow"))
@@ -33,40 +37,20 @@ dependencies {
implementation(libs.bundles.log4j)
implementation(libs.commons.collections4)
implementation(libs.commons.lang3)
- implementation(libs.trino.jdbc)
+ implementation("io.trino:trino-jdbc:$trinoVersion")
compileOnly(libs.airlift.resolver)
- compileOnly(libs.trino.spi) {
+ compileOnly("io.trino:trino-spi:$trinoVersion") {
exclude("org.apache.logging.log4j")
}
testImplementation(libs.awaitility)
testImplementation(libs.mockito.core)
testImplementation(libs.mysql.driver)
- testImplementation(libs.trino.memory) {
+ testImplementation("io.trino:trino-memory:$trinoVersion") {
exclude("org.antlr")
exclude("org.apache.logging.log4j")
}
- testImplementation(libs.trino.testing) {
+ testImplementation("io.trino:trino-testing:$trinoVersion") {
exclude("org.apache.logging.log4j")
}
testRuntimeOnly(libs.junit.jupiter.engine)
}
-
-tasks.named("generateMetadataFileForMavenJavaPublication") {
- dependsOn(":trino-connector:trino-connector:copyDepends")
-}
-
-tasks {
- val copyDepends by registering(Copy::class) {
- from(configurations.runtimeClasspath)
- into("build/libs")
- }
- jar {
- finalizedBy(copyDepends)
- }
-
- register("copyLibs", Copy::class) {
- dependsOn(copyDepends, "build")
- from("build/libs")
- into("$rootDir/distribution/${rootProject.name}-trino-connector")
- }
-}
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java
index c9b48b50d0..6247be1d4f 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java
@@ -18,7 +18,10 @@
*/
package org.apache.gravitino.trino.connector;
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+
import com.google.common.base.Preconditions;
+import io.trino.spi.TrinoException;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorAccessControl;
import io.trino.spi.connector.ConnectorCapabilities;
@@ -35,9 +38,9 @@ import io.trino.spi.transaction.IsolationLevel;
import java.util.List;
import java.util.Set;
import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.client.GravitinoMetalake;
import org.apache.gravitino.trino.connector.catalog.CatalogConnectorContext;
import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata;
+import
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
/**
* GravitinoConnector serves as the entry point for operations on the
connector managed by Trino and
@@ -47,19 +50,20 @@ import
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata;
public class GravitinoConnector implements Connector {
private final NameIdentifier catalogIdentifier;
- private final CatalogConnectorContext catalogConnectorContext;
+ protected final CatalogConnectorContext catalogConnectorContext;
+ private final CatalogConnectorMetadata connectorMetadata;
/**
* Constructs a new GravitinoConnector with the specified catalog identifier
and catalog connector
* context.
*
- * @param catalogIdentifier the catalog identifier
* @param catalogConnectorContext the catalog connector context
*/
- public GravitinoConnector(
- NameIdentifier catalogIdentifier, CatalogConnectorContext
catalogConnectorContext) {
- this.catalogIdentifier = catalogIdentifier;
+ public GravitinoConnector(CatalogConnectorContext catalogConnectorContext) {
+ this.catalogIdentifier =
catalogConnectorContext.getCatalog().geNameIdentifier();
this.catalogConnectorContext = catalogConnectorContext;
+ this.connectorMetadata =
+ new CatalogConnectorMetadata(catalogConnectorContext.getMetalake(),
this.catalogIdentifier);
}
@Override
@@ -86,14 +90,15 @@ public class GravitinoConnector implements Connector {
ConnectorMetadata internalMetadata =
internalConnector.getMetadata(session,
gravitinoTransactionHandle.getInternalHandle());
Preconditions.checkArgument(internalMetadata != null, "Internal metadata
must not be null");
+ return createGravitinoMetadata(
+ connectorMetadata, catalogConnectorContext.getMetadataAdapter(),
internalMetadata);
+ }
- GravitinoMetalake metalake = catalogConnectorContext.getMetalake();
-
- CatalogConnectorMetadata catalogConnectorMetadata =
- new CatalogConnectorMetadata(metalake, catalogIdentifier);
-
- return new GravitinoMetadata(
- catalogConnectorMetadata,
catalogConnectorContext.getMetadataAdapter(), internalMetadata);
+ protected GravitinoMetadata createGravitinoMetadata(
+ CatalogConnectorMetadata catalogConnectorMetadata,
+ CatalogConnectorMetadataAdapter metadataAdapter,
+ ConnectorMetadata internalMetadata) {
+ throw new TrinoException(NOT_SUPPORTED, "Should be overridden in
subclass");
}
@Override
@@ -172,4 +177,10 @@ public class GravitinoConnector implements Connector {
internalConnector.getNodePartitioningProvider();
return new GravitinoNodePartitioningProvider(nodePartitioningProvider);
}
+
+ public void shutdown() {
+ Connector internalConnector =
catalogConnectorContext.getInternalConnector();
+ internalConnector.shutdown();
+ catalogConnectorContext.close();
+ }
}
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnectorFactory.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnectorFactory.java
index 2bbcf61e4e..c9fd7e9392 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnectorFactory.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnectorFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.gravitino.trino.connector;
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static
org.apache.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_RUNTIME_ERROR;
import com.google.common.annotations.VisibleForTesting;
@@ -28,9 +29,9 @@ import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorContext;
import io.trino.spi.connector.ConnectorFactory;
import java.util.Map;
-import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.trino.connector.catalog.CatalogConnectorContext;
import org.apache.gravitino.trino.connector.catalog.CatalogConnectorFactory;
import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager;
import org.apache.gravitino.trino.connector.catalog.CatalogRegister;
@@ -45,6 +46,8 @@ import org.slf4j.LoggerFactory;
public class GravitinoConnectorFactory implements ConnectorFactory {
private static final Logger LOG =
LoggerFactory.getLogger(GravitinoConnectorFactory.class);
+ private static final int MIN_SUPPORT_TRINO_SPI_VERSION = 435;
+ private static final int MAX_SUPPORT_TRINO_SPI_VERSION = Integer.MAX_VALUE;
/** The default connector name. */
public static final String DEFAULT_CONNECTOR_NAME = "gravitino";
@@ -53,6 +56,13 @@ public class GravitinoConnectorFactory implements
ConnectorFactory {
private CatalogConnectorManager catalogConnectorManager;
+ private GravitinoAdminClient client;
+ private int trinoVersion;
+
+ public GravitinoConnectorFactory(GravitinoAdminClient client) {
+ this.client = client;
+ }
+
@Override
public String getName() {
return DEFAULT_CONNECTOR_NAME;
@@ -74,25 +84,31 @@ public class GravitinoConnectorFactory implements
ConnectorFactory {
*
* @param catalogName the connector name of catalog
* @param requiredConfig the config of connector
- * @param context Trino connector context
* @return Trino connector
*/
@Override
public Connector create(
- String catalogName, Map<String, String> requiredConfig, ConnectorContext
context) {
+ String catalogName,
+ Map<String, String> requiredConfig,
+ ConnectorContext trinoConnectorContext) {
Preconditions.checkArgument(requiredConfig != null, "requiredConfig is not
null");
GravitinoConfig config = new GravitinoConfig(requiredConfig);
synchronized (this) {
if (catalogConnectorManager == null) {
+ checkTrinoSpiVersion(trinoConnectorContext, config);
try {
CatalogRegister catalogRegister = new CatalogRegister();
CatalogConnectorFactory catalogConnectorFactory =
createCatalogConnectorFactory(config);
catalogConnectorManager =
- new CatalogConnectorManager(catalogRegister,
catalogConnectorFactory);
- catalogConnectorManager.config(config, clientProvider().get());
- catalogConnectorManager.start(context);
+ new CatalogConnectorManager(
+ catalogRegister, catalogConnectorFactory,
this::getTrinoCatalogName);
+ catalogConnectorManager.config(config, client);
+
+ if (isCoordinator(trinoConnectorContext)) {
+ catalogConnectorManager.start();
+ }
gravitinoSystemTableFactory = new
GravitinoSystemTableFactory(catalogConnectorManager);
} catch (Exception e) {
@@ -106,7 +122,12 @@ public class GravitinoConnectorFactory implements
ConnectorFactory {
if (config.isDynamicConnector()) {
// The dynamic connector is an instance of GravitinoConnector. It is
loaded from Gravitino
// server.
- return catalogConnectorManager.createConnector(catalogName, config,
context);
+ CatalogConnectorContext catalogConnectorContext =
+ catalogConnectorManager.createCatalogConnectorContext(
+ catalogName, config, trinoConnectorContext);
+ GravitinoConnector catalogConnector =
createConnector(catalogConnectorContext);
+ catalogConnectorContext.bindConnector(catalogConnector);
+ return catalogConnectorContext.getConnector();
} else {
// The static connector is an instance of GravitinoSystemConnector. It
is loaded by Trino
// using the connector configuration.
@@ -117,13 +138,75 @@ public class GravitinoConnectorFactory implements
ConnectorFactory {
}
GravitinoStoredProcedureFactory gravitinoStoredProcedureFactory =
new GravitinoStoredProcedureFactory(catalogConnectorManager,
metalake);
- return new GravitinoSystemConnector(gravitinoStoredProcedureFactory);
+ return createSystemConnector(gravitinoStoredProcedureFactory);
}
}
- @VisibleForTesting
- Supplier<GravitinoAdminClient> clientProvider() {
- return () -> null;
+ protected GravitinoConnector createConnector(CatalogConnectorContext
connectorContext) {
+ throw new TrinoException(NOT_SUPPORTED, "Should be overridden in
subclass");
+ }
+
+ protected GravitinoSystemConnector createSystemConnector(
+ GravitinoStoredProcedureFactory storedProcedureFactory) {
+ return new GravitinoSystemConnector(storedProcedureFactory);
+ }
+
+ protected String getTrinoCatalogName(String metalakeName, String
catalogName) {
+ return "\"" + metalakeName + "." + catalogName + "\"";
+ }
+
+ private void checkTrinoSpiVersion(ConnectorContext context, GravitinoConfig
config) {
+ String spiVersion = context.getSpiVersion();
+ trinoVersion = Integer.parseInt(spiVersion);
+
+ // check catalog name with metalake are supported in this trino version
+ if (!config.singleMetalakeMode() && !supportCatalogNameWithMetalake()) {
+ String errmsg =
+ String.format(
+ "The trino-connector-%s-%s does not support catalog name with
metalake.",
+ getMinSupportTrinoSpiVersion(), getMaxSupportTrinoSpiVersion());
+ throw new
TrinoException(GravitinoErrorCode.GRAVITINO_UNSUPPORTED_TRINO_VERSION, errmsg);
+ }
+
+ // skip version validation
+ boolean spiVersionCheck = config.isSkipTrinoVersionValidation();
+ if (spiVersionCheck) {
+ if (trinoVersion < getMinSupportTrinoSpiVersion()
+ || trinoVersion > getMaxSupportTrinoSpiVersion()) {
+ LOG.warn(
+ "The version {} has not undergone thorough testing with Gravitino,
there may be compatibility problem.",
+ trinoVersion);
+ }
+ return;
+ }
+
+ // version validation
+ if (trinoVersion < getMinSupportTrinoSpiVersion()
+ || trinoVersion > getMaxSupportTrinoSpiVersion()) {
+ String errmsg =
+ String.format(
+ "Unsupported Trino-%s version. The Supported version for the
Gravitino-Trino-connector from Trino-%d to Trino-%d."
+ + "Maybe you can set gravitino.trino.skip-version-validation
to skip version validation.",
+ trinoVersion, getMinSupportTrinoSpiVersion(),
getMaxSupportTrinoSpiVersion());
+ throw new
TrinoException(GravitinoErrorCode.GRAVITINO_UNSUPPORTED_TRINO_VERSION, errmsg);
+ }
+ }
+
+ protected boolean supportCatalogNameWithMetalake() {
+ return true;
+ }
+
+ protected int getMinSupportTrinoSpiVersion() {
+ return MIN_SUPPORT_TRINO_SPI_VERSION;
+ }
+
+ protected int getMaxSupportTrinoSpiVersion() {
+ return MAX_SUPPORT_TRINO_SPI_VERSION;
+ }
+
+ @SuppressWarnings("deprecation")
+ protected boolean isCoordinator(ConnectorContext connectorContext) {
+ return connectorContext.getNodeManager().getCurrentNode().isCoordinator();
}
private CatalogConnectorFactory
createCatalogConnectorFactory(GravitinoConfig config) {
@@ -145,4 +228,8 @@ public class GravitinoConnectorFactory implements
ConnectorFactory {
GRAVITINO_RUNTIME_ERROR, "Can not create CatalogConnectorFactory ",
e);
}
}
+
+ public int getTrinoVersion() {
+ return trinoVersion;
+ }
}
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
index eadb9d5bde..5bd0b0e607 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java
@@ -23,7 +23,6 @@ import static
org.apache.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import io.airlift.slice.Slice;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.AggregateFunction;
import io.trino.spi.connector.AggregationApplicationResult;
@@ -31,9 +30,7 @@ import io.trino.spi.connector.Assignment;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorInsertTableHandle;
-import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorMetadata;
-import io.trino.spi.connector.ConnectorOutputMetadata;
import io.trino.spi.connector.ConnectorPartitioningHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
@@ -58,10 +55,8 @@ import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.expression.Constant;
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.statistics.ColumnStatistics;
-import io.trino.spi.statistics.ComputedStatistics;
import io.trino.spi.statistics.TableStatistics;
import io.trino.spi.type.Type;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -71,7 +66,6 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata;
import
org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
-import org.apache.gravitino.trino.connector.metadata.GravitinoColumn;
import org.apache.gravitino.trino.connector.metadata.GravitinoSchema;
import org.apache.gravitino.trino.connector.metadata.GravitinoTable;
@@ -80,18 +74,18 @@ import
org.apache.gravitino.trino.connector.metadata.GravitinoTable;
* server. It also transforms the different metadata formats between Trino and
Gravitino.
* Additionally, it wraps the internal connector metadata for accessing data.
*/
-public class GravitinoMetadata implements ConnectorMetadata {
+public abstract class GravitinoMetadata implements ConnectorMetadata {
// The column handle name that will generate row IDs for the merge operation.
public static final String MERGE_ROW_ID = "$row_id";
// Handling metadata operations on gravitino server
- private final CatalogConnectorMetadata catalogConnectorMetadata;
+ protected final CatalogConnectorMetadata catalogConnectorMetadata;
// Transform different metadata format
- private final CatalogConnectorMetadataAdapter metadataAdapter;
+ protected final CatalogConnectorMetadataAdapter metadataAdapter;
- private final ConnectorMetadata internalMetadata;
+ protected final ConnectorMetadata internalMetadata;
/**
* Constructs a new GravitinoMetadata instance.
@@ -253,16 +247,6 @@ public class GravitinoMetadata implements
ConnectorMetadata {
return new GravitinoInsertTableHandle(insertTableHandle);
}
- @Override
- public Optional<ConnectorOutputMetadata> finishInsert(
- ConnectorSession session,
- ConnectorInsertTableHandle insertHandle,
- Collection<Slice> fragments,
- Collection<ComputedStatistics> computedStatistics) {
- return internalMetadata.finishInsert(
- session, GravitinoHandle.unWrap(insertHandle), fragments,
computedStatistics);
- }
-
@Override
public void renameSchema(ConnectorSession session, String source, String
target) {
catalogConnectorMetadata.renameSchema(source, target);
@@ -293,13 +277,6 @@ public class GravitinoMetadata implements
ConnectorMetadata {
catalogConnectorMetadata.setTableProperties(getTableName(tableHandle),
allProps);
}
- @Override
- public void addColumn(
- ConnectorSession session, ConnectorTableHandle tableHandle,
ColumnMetadata column) {
- GravitinoColumn gravitinoColumn = metadataAdapter.createColumn(column);
- catalogConnectorMetadata.addColumn(getTableName(tableHandle),
gravitinoColumn);
- }
-
@Override
public void dropColumn(
ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle
column) {
@@ -616,27 +593,6 @@ public class GravitinoMetadata implements
ConnectorMetadata {
return updateLayout.map(GravitinoPartitioningHandle::new);
}
- @Override
- public ConnectorMergeTableHandle beginMerge(
- ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode
retryMode) {
- ConnectorMergeTableHandle connectorMergeTableHandle =
- internalMetadata.beginMerge(session,
GravitinoHandle.unWrap(tableHandle), retryMode);
- SchemaTableName tableName = getTableName(tableHandle);
-
- return new GravitinoMergeTableHandle(
- tableName.getSchemaName(), tableName.getTableName(),
connectorMergeTableHandle);
- }
-
- @Override
- public void finishMerge(
- ConnectorSession session,
- ConnectorMergeTableHandle mergeTableHandle,
- Collection<Slice> fragments,
- Collection<ComputedStatistics> computedStatistics) {
- internalMetadata.finishMerge(
- session, GravitinoHandle.unWrap(mergeTableHandle), fragments,
computedStatistics);
- }
-
@Override
public Optional<ConnectorTableHandle> applyUpdate(
ConnectorSession session,
@@ -702,7 +658,7 @@ public class GravitinoMetadata implements ConnectorMetadata
{
: new ConnectorTableLayout(result.getPartitionColumns()));
}
- private SchemaTableName getTableName(ConnectorTableHandle tableHandle) {
+ protected SchemaTableName getTableName(ConnectorTableHandle tableHandle) {
return ((GravitinoTableHandle) tableHandle).toSchemaTableName();
}
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoNodePartitioningProvider.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoNodePartitioningProvider.java
index 8226b70d14..1780fbfa11 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoNodePartitioningProvider.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoNodePartitioningProvider.java
@@ -23,12 +23,10 @@ import io.trino.spi.connector.ConnectorBucketNodeMap;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
import io.trino.spi.connector.ConnectorPartitioningHandle;
import io.trino.spi.connector.ConnectorSession;
-import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.type.Type;
import java.util.List;
import java.util.Optional;
-import java.util.function.ToIntFunction;
/**
* This class provides a ConnectorNodePartitioningProvider for Trino to decide
data partitioning
@@ -59,17 +57,6 @@ public class GravitinoNodePartitioningProvider implements
ConnectorNodePartition
GravitinoHandle.unWrap(partitioningHandle));
}
- @Override
- public ToIntFunction<ConnectorSplit> getSplitBucketFunction(
- ConnectorTransactionHandle transactionHandle,
- ConnectorSession session,
- ConnectorPartitioningHandle partitioningHandle) {
- return nodePartitioningProvider.getSplitBucketFunction(
- GravitinoHandle.unWrap(transactionHandle),
- session,
- GravitinoHandle.unWrap(partitioningHandle));
- }
-
@Override
public BucketFunction getBucketFunction(
ConnectorTransactionHandle transactionHandle,
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPlugin.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPlugin.java
index 2b4332845c..80b4b96b66 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPlugin.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPlugin.java
@@ -18,15 +18,41 @@
*/
package org.apache.gravitino.trino.connector;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.trino.spi.Plugin;
import io.trino.spi.connector.ConnectorFactory;
+import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager;
/** Trino plugin endpoint, using java spi mechanism */
public class GravitinoPlugin implements Plugin {
+ private GravitinoConnectorFactory factory;
+ protected GravitinoAdminClient client;
+
+ public GravitinoPlugin() {}
+
+ public GravitinoPlugin(GravitinoAdminClient client) {
+ this.client = client;
+ }
@Override
public Iterable<ConnectorFactory> getConnectorFactories() {
- return ImmutableList.of(new GravitinoConnectorFactory());
+ factory = createConnectorFactory(client);
+ return ImmutableList.of(factory);
+ }
+
+ protected GravitinoConnectorFactory
createConnectorFactory(GravitinoAdminClient client) {
+ return new GravitinoConnectorFactory(client);
+ }
+
+ @VisibleForTesting
+ CatalogConnectorManager getCatalogConnectorManager() {
+ return factory.getCatalogConnectorManager();
+ }
+
+ @VisibleForTesting
+ int getTrinoVersion() {
+ return factory.getTrinoVersion();
}
}
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplit.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplit.java
index 6e020dd103..9d70576cbe 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplit.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplit.java
@@ -31,7 +31,7 @@ import java.util.List;
* The GravitinoFTransactionHandle is used to make Apache Gravitino metadata
operations
* transactional and wrap the inner connector transaction for data access.
*/
-public class GravitinoSplit implements ConnectorSplit,
GravitinoHandle<ConnectorSplit> {
+public abstract class GravitinoSplit implements ConnectorSplit,
GravitinoHandle<ConnectorSplit> {
private HandleWrapper<ConnectorSplit> handleWrapper = new
HandleWrapper<>(ConnectorSplit.class);
@@ -75,11 +75,6 @@ public class GravitinoSplit implements ConnectorSplit,
GravitinoHandle<Connector
return handleWrapper.getHandle().getAddresses();
}
- @Override
- public Object getInfo() {
- return handleWrapper.getHandle().getInfo();
- }
-
@Override
public SplitWeight getSplitWeight() {
return handleWrapper.getHandle().getSplitWeight();
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitManager.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitManager.java
index 97f1fcc9ba..d3bdb1ecf0 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitManager.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitManager.java
@@ -18,6 +18,9 @@
*/
package org.apache.gravitino.trino.connector;
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+
+import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorSplitSource;
@@ -53,6 +56,10 @@ public class GravitinoSplitManager implements
ConnectorSplitManager {
GravitinoHandle.unWrap(connectorTableHandle),
new GravitinoDynamicFilter(dynamicFilter),
constraint);
- return new GravitinoSplitSource(splits);
+ return createSplitSource(splits);
+ }
+
+ protected ConnectorSplitSource createSplitSource(ConnectorSplitSource
splits) {
+ throw new TrinoException(NOT_SUPPORTED, "Should be overridden in
subclass");
}
}
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitSource.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitSource.java
index 9e8af76b5a..aaa3af89f9 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitSource.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoSplitSource.java
@@ -18,6 +18,9 @@
*/
package org.apache.gravitino.trino.connector;
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+
+import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorSplitSource;
import java.util.List;
@@ -49,11 +52,15 @@ public class GravitinoSplitSource implements
ConnectorSplitSource {
.thenApply(
batch -> {
List<ConnectorSplit> list =
-
batch.getSplits().stream().map(GravitinoSplit::new).collect(Collectors.toList());
+
batch.getSplits().stream().map(this::createSplit).collect(Collectors.toList());
return new ConnectorSplitBatch(list, batch.isNoMoreSplits());
});
}
+ protected ConnectorSplit createSplit(ConnectorSplit split) {
+ throw new TrinoException(NOT_SUPPORTED, "Should be overridden in
subclass");
+ }
+
@Override
public void close() {
connectorSplitSource.close();
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorContext.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorContext.java
index 02024b1742..83a8a6124e 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorContext.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorContext.java
@@ -40,7 +40,7 @@ public class CatalogConnectorContext {
private final GravitinoMetalake metalake;
// Connector communicates with Trino
- private final GravitinoConnector connector;
+ private GravitinoConnector connector;
// Internal connector communicates with data storage
private final Connector internalConnector;
@@ -64,8 +64,15 @@ public class CatalogConnectorContext {
this.metalake = metalake;
this.internalConnector = internalConnector;
this.adapter = adapter;
+ }
- this.connector = new GravitinoConnector(catalog.geNameIdentifier(), this);
+ /**
+ * Binds the Gravitino connector to this context.
+ *
+ * @param connector the Gravitino connector
+ */
+ public void bindConnector(GravitinoConnector connector) {
+ this.connector = connector;
}
/**
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java
index e1b8a7e54d..38f5290464 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java
@@ -21,7 +21,6 @@ package org.apache.gravitino.trino.connector.catalog;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.trino.spi.TrinoException;
-import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorContext;
import java.util.Arrays;
import java.util.HashSet;
@@ -35,7 +34,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.client.GravitinoAdminClient;
import org.apache.gravitino.client.GravitinoMetalake;
@@ -76,6 +75,7 @@ public class CatalogConnectorManager {
private GravitinoAdminClient gravitinoClient;
private GravitinoConfig config;
+ private TrinoCatalogNameHandler trinoCatalogNameHandler;
/**
* Constructs a new CatalogConnectorManager with the specified catalog
register and catalog
@@ -85,10 +85,13 @@ public class CatalogConnectorManager {
* @param catalogFactory the catalog connector factory
*/
public CatalogConnectorManager(
- CatalogRegister catalogRegister, CatalogConnectorFactory catalogFactory)
{
+ CatalogRegister catalogRegister,
+ CatalogConnectorFactory catalogFactory,
+ TrinoCatalogNameHandler trinoCatalogNameHandler) {
this.catalogRegister = catalogRegister;
this.catalogConnectorFactory = catalogFactory;
this.executorService = createScheduledThreadPoolExecutor();
+ this.trinoCatalogNameHandler = trinoCatalogNameHandler;
}
private static ScheduledThreadPoolExecutor
createScheduledThreadPoolExecutor() {
@@ -127,19 +130,15 @@ public class CatalogConnectorManager {
/**
* Starts the catalog connector manager with the specified Trino connector
context.
*
- * @param context the Trino connector context
* @throws Exception if the catalog connector manager fails to start
*/
- public void start(ConnectorContext context) throws Exception {
- catalogRegister.init(context, config);
- if (catalogRegister.isCoordinator()) {
- executorService.scheduleWithFixedDelay(
- this::loadMetalake,
- metadataUpdateIntervalSecond,
- metadataUpdateIntervalSecond,
- TimeUnit.SECONDS);
- }
-
+ public void start() throws Exception {
+ catalogRegister.init(config);
+ executorService.scheduleWithFixedDelay(
+ this::loadMetalake,
+ metadataUpdateIntervalSecond,
+ metadataUpdateIntervalSecond,
+ TimeUnit.SECONDS);
LOG.info("Gravitino CatalogConnectorManager started.");
}
@@ -330,7 +329,15 @@ public class CatalogConnectorManager {
/** Shuts down the catalog connector manager. */
public void shutdown() {
LOG.info("Gravitino CatalogConnectorManager shutdown.");
- throw new NotImplementedException();
+ if (catalogRegister != null) {
+ catalogRegister.close();
+ }
+
+ executorService.shutdown();
+
+ if (gravitinoClient != null) {
+ gravitinoClient.close();
+ }
}
/**
@@ -341,7 +348,9 @@ public class CatalogConnectorManager {
* @return the Trino catalog name
*/
public String getTrinoCatalogName(String metalake, String catalog) {
- return config.singleMetalakeMode() ? catalog : String.format("\"%s.%s\"",
metalake, catalog);
+ return config.singleMetalakeMode()
+ ? catalog
+ : trinoCatalogNameHandler.getCatalogName(metalake, catalog);
}
/**
@@ -369,14 +378,21 @@ public class CatalogConnectorManager {
* @param connectorName the name of the connector
* @param config the Gravitino configuration
* @param context the Trino connector context
- * @return the created connector
+ * @return the created catalog connector context
*/
- public Connector createConnector(
+ public CatalogConnectorContext createCatalogConnectorContext(
String connectorName, GravitinoConfig config, ConnectorContext context) {
try {
String catalogConfig = config.getCatalogConfig();
GravitinoCatalog catalog = GravitinoCatalog.fromJson(catalogConfig);
+ if (this.config.singleMetalakeMode()
+ && StringUtils.isNotBlank(targetMetalake)
+ && !targetMetalake.equals(catalog.getMetalake())) {
+ throw new TrinoException(
+ GravitinoErrorCode.GRAVITINO_UNSUPPORTED_OPERATION,
+ "Multiple metalakes are not supported");
+ }
CatalogConnectorContext.Builder builder =
catalogConnectorFactory.createCatalogConnectorContextBuilder(catalog);
builder
@@ -384,9 +400,10 @@ public class CatalogConnectorManager {
.withContext(context);
CatalogConnectorContext connectorContext = builder.build();
- catalogConnectors.put(connectorName, connectorContext);
+ String fullCatalogName = getTrinoCatalogName(catalog);
+ catalogConnectors.put(fullCatalogName, connectorContext);
LOG.info("Create connector {} successful", connectorName);
- return connectorContext.getConnector();
+ return connectorContext;
} catch (Exception e) {
LOG.error("Failed to create connector: {}", connectorName, e);
throw new TrinoException(
@@ -433,4 +450,8 @@ public class CatalogConnectorManager {
}
return false;
}
+
+ public interface TrinoCatalogNameHandler {
+ String getCatalogName(String metalake, String catalog);
+ }
}
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java
index a66bd2aed6..3e2727ac2b 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorMetadata.java
@@ -326,6 +326,27 @@ public class CatalogConnectorMetadata {
}
}
+ /**
+ * Adds a new column to the specified table with a position hint.
+ *
+ * @param schemaTableName the name of the schema and table
+ * @param column the Gravitino column to add
+ * @param position the target position for the new column
+ */
+ public void addColumn(
+ SchemaTableName schemaTableName,
+ GravitinoColumn column,
+ TableChange.ColumnPosition position) {
+ String[] columnNames = {column.getName()};
+ String comment = Strings.isNullOrEmpty(column.getComment()) ? null :
column.getComment();
+ TableChange.ColumnPosition targetPosition =
+ position == null ? TableChange.ColumnPosition.defaultPos() : position;
+ applyAlter(
+ schemaTableName,
+ TableChange.addColumn(
+ columnNames, column.getType(), comment, targetPosition,
column.isNullable()));
+ }
+
/**
* Drops a column from the specified table.
*
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogRegister.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogRegister.java
index 5e26bf700d..0b79d1af33 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogRegister.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogRegister.java
@@ -23,7 +23,6 @@ import static
org.apache.gravitino.trino.connector.GravitinoConfig.GRAVITINO_DYN
import io.trino.jdbc.TrinoDriver;
import io.trino.spi.TrinoException;
-import io.trino.spi.connector.ConnectorContext;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -47,61 +46,14 @@ public class CatalogRegister {
private static final Logger LOG =
LoggerFactory.getLogger(CatalogRegister.class);
- private static final int MIN_SUPPORT_TRINO_SPI_VERSION = 435;
- private static final int MAX_SUPPORT_TRINO_SPI_VERSION = 439;
- private static final int
MIN_SUPPORT_CATALOG_NAME_WITH_METALAKE_TRINO_SPI_VERSION = 446;
private static final int EXECUTE_QUERY_MAX_RETRIES = 6;
private static final int EXECUTE_QUERY_BACKOFF_TIME_SECOND = 5;
- private String trinoVersion;
private Connection connection;
- private boolean isCoordinator;
private boolean isStarted = false;
private String catalogStoreDirectory;
private GravitinoConfig config;
- private void checkTrinoSpiVersion(ConnectorContext context) {
- this.trinoVersion = context.getSpiVersion();
-
- int version = Integer.parseInt(trinoVersion);
-
- if (version < MIN_SUPPORT_TRINO_SPI_VERSION || version >
MAX_SUPPORT_TRINO_SPI_VERSION) {
- Boolean skipTrinoVersionValidation =
config.isSkipTrinoVersionValidation();
- if (!skipTrinoVersionValidation) {
- String errmsg =
- String.format(
- "Unsupported Trino-%s version. The Supported version for the
Gravitino-Trino-connector from Trino-%d to Trino-%d."
- + "Maybe you can set
gravitino.trino.skip-version-validation to skip version validation.",
- trinoVersion, MIN_SUPPORT_TRINO_SPI_VERSION,
MAX_SUPPORT_TRINO_SPI_VERSION);
- throw new
TrinoException(GravitinoErrorCode.GRAVITINO_UNSUPPORTED_TRINO_VERSION, errmsg);
- } else {
- LOG.warn(
- "The version %s has not undergone thorough testing with Gravitino,
there may be compatiablity problem.",
- trinoVersion);
- }
- }
-
- isCoordinator = context.getNodeManager().getCurrentNode().isCoordinator();
- }
-
- private void checkSupportCatalogNameWithMetalake(
- ConnectorContext context, GravitinoConfig config) {
- if (!config.singleMetalakeMode()) {
- int version = Integer.parseInt(context.getSpiVersion());
- if (version < MIN_SUPPORT_CATALOG_NAME_WITH_METALAKE_TRINO_SPI_VERSION) {
- LOG.warn(
- "Trino-{} does not support catalog name with dots, The minimal
required version is Trino-{}."
- + "Some errors may occur when using the USE <CATALOG>.<SCHEMA>
statement in Trino",
- trinoVersion,
- MIN_SUPPORT_CATALOG_NAME_WITH_METALAKE_TRINO_SPI_VERSION);
- }
- }
- }
-
- boolean isCoordinator() {
- return isCoordinator;
- }
-
boolean isTrinoStarted() {
if (isStarted) {
return true;
@@ -121,14 +73,11 @@ public class CatalogRegister {
* Initializes the catalog register with the specified Trino connector
context and Gravitino
* configuration.
*
- * @param context the Trino connector context
* @param config the Gravitino configuration
* @throws Exception if the catalog register fails to initialize
*/
- public void init(ConnectorContext context, GravitinoConfig config) throws
Exception {
+ public void init(GravitinoConfig config) throws Exception {
this.config = config;
- checkTrinoSpiVersion(context);
- checkSupportCatalogNameWithMetalake(context, config);
TrinoDriver driver = new TrinoDriver();
DriverManager.registerDriver(driver);
@@ -203,6 +152,8 @@ public class CatalogRegister {
String createCatalogCommand = generateCreateCatalogCommand(name,
catalog);
executeSql(createCatalogCommand);
LOG.info("Register catalog {} successfully: {}", name,
createCatalogCommand);
+ } catch (SQLException e) {
+ throw new TrinoException(GravitinoErrorCode.GRAVITINO_RUNTIME_ERROR,
e.getMessage(), e);
} catch (Exception e) {
String message = String.format("Failed to register catalog %s", name);
LOG.error(message);
@@ -211,7 +162,7 @@ public class CatalogRegister {
}
private boolean checkCatalogExist(String name) {
- String showCatalogCommand = String.format("SHOW CATALOGS like '%s'", name);
+ String showCatalogCommand = "SHOW CATALOGS";
Exception failedException = null;
try {
int retries = EXECUTE_QUERY_MAX_RETRIES;
@@ -222,11 +173,17 @@ public class CatalogRegister {
ResultSet rs = statement.getResultSet();
while (rs.next()) {
String catalogName = rs.getString(1);
- if (catalogName.equals(name) || catalogName.equals("\"" + name +
"\"")) {
+ // In some Trino version catalog name may be quoted, so we need to
check both quoted and
+ // unquoted names
+ if (name.equals(catalogName)
+ || name.equals("\"" + catalogName + "\"")
+ || ("\"" + name + "\"").equals(catalogName)) {
return true;
}
}
return false;
+ } catch (SQLException e) {
+ throw e;
} catch (Exception e) {
failedException = e;
LOG.warn("Execute command failed: {}, ", showCatalogCommand, e);
@@ -252,6 +209,8 @@ public class CatalogRegister {
// check the catalog is already created
statement.execute(sql);
return;
+ } catch (SQLException e) {
+ throw e;
} catch (Exception e) {
failedException = e;
LOG.warn("Execute command failed: {}, ", sql, e);
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/system/GravitinoSystemConnector.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/system/GravitinoSystemConnector.java
index c6b894170c..57ee95c0ea 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/system/GravitinoSystemConnector.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/system/GravitinoSystemConnector.java
@@ -18,10 +18,13 @@
*/
package org.apache.gravitino.trino.connector.system;
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.spi.HostAddress;
import io.trino.spi.Page;
+import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorMetadata;
@@ -79,16 +82,30 @@ public class GravitinoSystemConnector implements Connector {
@Override
public ConnectorMetadata getMetadata(
ConnectorSession session, ConnectorTransactionHandle transactionHandle) {
+ return createMetadata();
+ }
+
+ protected ConnectorMetadata createMetadata() {
return new GravitinoSystemConnectorMetadata();
}
@Override
public ConnectorSplitManager getSplitManager() {
- return new SplitManager();
+ return createSplitManager();
}
@Override
public ConnectorPageSourceProvider getPageSourceProvider() {
+ return createPageSourceProvider();
+ }
+
+ public void shutdown() {}
+
+ protected ConnectorSplitManager createSplitManager() {
+ return new SplitManager();
+ }
+
+ protected ConnectorPageSourceProvider createPageSourceProvider() {
return new DatasourceProvider();
}
@@ -112,7 +129,11 @@ public class GravitinoSystemConnector implements Connector
{
SchemaTableName tableName =
((GravitinoSystemConnectorMetadata.SystemTableHandle)
table).getName();
- return new
SystemTablePageSource(GravitinoSystemTableFactory.loadPageData(tableName));
+ return
createPageSource(GravitinoSystemTableFactory.loadPageData(tableName));
+ }
+
+ protected ConnectorPageSource createPageSource(Page page) {
+ throw new TrinoException(NOT_SUPPORTED, "Should be overridden in
subclass");
}
}
@@ -129,13 +150,18 @@ public class GravitinoSystemConnector implements
Connector {
SchemaTableName tableName =
((GravitinoSystemConnectorMetadata.SystemTableHandle)
connectorTableHandle).getName();
- return new FixedSplitSource(new Split(tableName));
+ return new FixedSplitSource(createSplit(tableName));
+ }
+
+ protected ConnectorSplit createSplit(SchemaTableName tableName) {
+ throw new TrinoException(NOT_SUPPORTED, "Should be overridden in
subclass");
}
}
/** The split. */
- public static class Split implements ConnectorSplit {
- private final SchemaTableName tableName;
+ public abstract static class Split implements ConnectorSplit {
+
+ protected final SchemaTableName tableName;
/**
* Constructs a new Split with the specified table name.
@@ -166,18 +192,13 @@ public class GravitinoSystemConnector implements
Connector {
public List<HostAddress> getAddresses() {
return Collections.emptyList();
}
-
- @Override
- public Object getInfo() {
- return this;
- }
}
/** The system table page source. */
- public static class SystemTablePageSource implements ConnectorPageSource {
+ public abstract static class SystemTablePageSource implements
ConnectorPageSource {
- private boolean isFinished = false;
- private final Page page;
+ protected boolean isFinished = false;
+ protected final Page page;
/**
* Constructs a new SystemTablePageSource.
@@ -203,8 +224,7 @@ public class GravitinoSystemConnector implements Connector {
return isFinished;
}
- @Override
- public Page getNextPage() {
+ public Page nextPage() {
if (isFinished) {
return null;
}
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/BlockJsonSerde.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/BlockJsonSerde.java
index 2ed83b1be8..e100665668 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/BlockJsonSerde.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/util/json/BlockJsonSerde.java
@@ -37,6 +37,7 @@ import java.lang.reflect.Method;
*/
public final class BlockJsonSerde {
private static final String BLOCK_SERDE_UTIL_CLASS_NAME =
"io.trino.block.BlockSerdeUtil";
+ private static final int DEFAULT_BLOCK_ENCODING_NAME_LENGTH = 64;
/**
* Jackson serializer for Trino {@link Block} objects. Serializes block
instances using Trino's
@@ -65,11 +66,13 @@ public final class BlockJsonSerde {
public void serialize(
Block block, JsonGenerator jsonGenerator, SerializerProvider
serializerProvider)
throws IOException {
- // Encoding name is length prefixed as are many block encodings
- SliceOutput output =
- new DynamicSliceOutput(
- toIntExact(
- block.getSizeInBytes() + block.getEncodingName().length() +
(2 * Integer.BYTES)));
+ // estimate size to avoid multiple resizes, encoding name is length
prefixed as are many block
+ // like : SIZE_OF_INT + blockEncoding.getName().length() +
block.getSizeInBytes();
+ // we use a default length for encoding name to avoid calculating actual
length
+ int estimatedSize =
+ toIntExact(
+ block.getSizeInBytes() + (2 * Integer.BYTES) +
DEFAULT_BLOCK_ENCODING_NAME_LENGTH);
+ SliceOutput output = new DynamicSliceOutput(estimatedSize);
try {
writeBlock.invoke(null, blockEncodingSerde, output, block);
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/AbstractGravitinoConnectorTest.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/AbstractGravitinoConnectorTest.java
new file mode 100644
index 0000000000..ac0d223cb3
--- /dev/null
+++
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/AbstractGravitinoConnectorTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector;
+
+import io.trino.plugin.memory.MemoryPlugin;
+import io.trino.testing.AbstractTestQueryFramework;
+import io.trino.testing.DistributedQueryRunner;
+import io.trino.testing.QueryRunner;
+import java.util.concurrent.TimeUnit;
+import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager;
+import org.awaitility.Awaitility;
+
+abstract class AbstractGravitinoConnectorTest extends
AbstractTestQueryFramework {
+
+ GravitinoMockServer server;
+ int trinoVersion;
+
+ @Override
+ protected QueryRunner createQueryRunner() throws Exception {
+ GravitinoAdminClient gravitinoClient = createGravitinoClient();
+ try {
+
+ DistributedQueryRunner queryRunner = createTrinoQueryRunner();
+
+ GravitinoPlugin gravitinoPlugin = createGravitinoPlugin(gravitinoClient);
+ queryRunner.installPlugin(gravitinoPlugin);
+
+ configureCatalogs(queryRunner, gravitinoClient);
+
+
GravitinoConnectorPluginManager.instance(this.getClass().getClassLoader())
+ .installPlugin("memory", new MemoryPlugin());
+ CatalogConnectorManager catalogConnectorManager =
+ gravitinoPlugin.getCatalogConnectorManager();
+ trinoVersion = gravitinoPlugin.getTrinoVersion();
+ server.setCatalogConnectorManager(catalogConnectorManager, trinoVersion);
+
+ // Wait for the catalog to be created. Wait for at least 30 seconds.
+ Awaitility.await()
+ .atMost(30, TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .until(() -> !catalogConnectorManager.getCatalogs().isEmpty());
+
+ return queryRunner;
+ } catch (Exception e) {
+ throw new RuntimeException("Create query runner failed", e);
+ }
+ }
+
+ protected GravitinoPlugin createGravitinoPlugin(GravitinoAdminClient client)
{
+ return new GravitinoPlugin(client);
+ }
+
+ protected abstract DistributedQueryRunner createTrinoQueryRunner() throws
Exception;
+
+ protected GravitinoAdminClient createGravitinoClient() {
+ server = closeAfterClass(new GravitinoMockServer());
+ return server.createGravitinoClient();
+ }
+
+ protected abstract void configureCatalogs(
+ DistributedQueryRunner queryRunner, GravitinoAdminClient
gravitinoClient) throws Exception;
+}
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/GravitinoMockServer.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/GravitinoMockServer.java
index b76eea855f..2f8097f88b 100644
---
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/GravitinoMockServer.java
+++
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/GravitinoMockServer.java
@@ -19,6 +19,7 @@
package org.apache.gravitino.trino.connector;
import static java.util.Collections.emptyMap;
+import static
org.apache.gravitino.trino.connector.TestGravitinoConnector.SPI_VERSION_SUPPORT_ADD_COLUMN_WITH_POSITION;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyMap;
@@ -30,7 +31,9 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.trino.plugin.memory.MemoryConnector;
import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.SaveMode;
@@ -79,19 +82,32 @@ public class GravitinoMockServer implements AutoCloseable {
private boolean start = true;
CatalogConnectorManager catalogConnectorManager;
private GeneralDataTypeTransformer dataTypeTransformer = new
HiveDataTypeTransformer();
+ private int trinoVersion = 0;
public GravitinoMockServer() {
createMetalake(testMetalake);
createCatalog(testMetalake, testCatalog, ImmutableMap.of());
}
- public void setCatalogConnectorManager(CatalogConnectorManager
catalogConnectorManager) {
+ public void setCatalogConnectorManager(
+ CatalogConnectorManager catalogConnectorManager, int trinoVersion) {
this.catalogConnectorManager = catalogConnectorManager;
+ this.trinoVersion = trinoVersion;
}
public GravitinoAdminClient createGravitinoClient() {
GravitinoAdminClient client = mock(GravitinoAdminClient.class);
+ when(client.listMetalakes())
+ .thenAnswer(
+ new Answer<GravitinoMetalake[]>() {
+ @Override
+ public GravitinoMetalake[] answer(InvocationOnMock invocation)
throws Throwable {
+ return metalakes.values().stream()
+ .map(metalake -> metalake.metalake)
+ .toArray(GravitinoMetalake[]::new);
+ }
+ });
when(client.createMetalake(anyString(), anyString(), anyMap()))
.thenAnswer(
new Answer<GravitinoMetalake>() {
@@ -558,7 +574,7 @@ public class GravitinoMockServer implements AutoCloseable {
catalogConnectorManager
.getCatalogConnector(catalogConnectorManager.getTrinoCatalogName(catalog))
.getMetadataAdapter();
- metadata.addColumn(null, tableHandle,
metadataAdapter.getColumnMetadata(column));
+ addColumn(metadata, tableHandle,
metadataAdapter.getColumnMetadata(column));
} else if (tableChange instanceof TableChange.DeleteColumn) {
TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn)
tableChange;
@@ -603,6 +619,42 @@ public class GravitinoMockServer implements AutoCloseable {
}
}
+ private void addColumn(
+ ConnectorMetadata metadata, ConnectorTableHandle tableHandle,
ColumnMetadata columnMetadata) {
+ if (trinoVersion < SPI_VERSION_SUPPORT_ADD_COLUMN_WITH_POSITION) {
+ try {
+ metadata
+ .getClass()
+ .getMethod(
+ "addColumn",
+ ConnectorSession.class,
+ ConnectorTableHandle.class,
+ ColumnMetadata.class)
+ .invoke(metadata, null, tableHandle, columnMetadata);
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException("Failed to invoke legacy addColumn by
reflection", e);
+ }
+ } else {
+ try {
+ Class<?> columnPositionClass =
Class.forName("io.trino.spi.connector.ColumnPosition");
+ Class<?> lastPositionClass =
Class.forName("io.trino.spi.connector.ColumnPosition$Last");
+ Object position =
lastPositionClass.getDeclaredConstructor().newInstance();
+
+ metadata
+ .getClass()
+ .getMethod(
+ "addColumn",
+ ConnectorSession.class,
+ ConnectorTableHandle.class,
+ ColumnMetadata.class,
+ columnPositionClass)
+ .invoke(metadata, null, tableHandle, columnMetadata, position);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to invoke addColumn with
ColumnPosition", e);
+ }
+ }
+ }
+
@ResourcePresence
public boolean isRunning() {
return start;
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestCreateGravitinoConnector.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestCreateGravitinoConnector.java
deleted file mode 100644
index 2845d3c604..0000000000
---
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestCreateGravitinoConnector.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.trino.connector;
-
-import static io.trino.testing.TestingSession.testSessionBuilder;
-import static org.assertj.core.api.Assertions.assertThat;
-
-import io.trino.Session;
-import io.trino.testing.DistributedQueryRunner;
-import io.trino.testing.QueryRunner;
-import java.util.HashMap;
-import org.apache.gravitino.client.GravitinoAdminClient;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-
-@Disabled
-public class TestCreateGravitinoConnector {
-
- GravitinoMockServer server;
-
- @Test
- public void testCreateConnectorsWithEnableSimpleCatalog() throws Exception {
- server = new GravitinoMockServer();
- Session session = testSessionBuilder().setCatalog("gravitino").build();
- QueryRunner queryRunner =
DistributedQueryRunner.builder(session).setNodeCount(1).build();
-
- GravitinoAdminClient gravitinoClient = server.createGravitinoClient();
- TestGravitinoPlugin gravitinoPlugin = new
TestGravitinoPlugin(gravitinoClient);
- queryRunner.installPlugin(gravitinoPlugin);
-
- // test create two connector and set gravitino.simplify-catalog-names =
true
- {
- // create a gravitino connector named gravitino using metalake test
- HashMap<String, String> properties = new HashMap<>();
- properties.put("gravitino.metalake", "test");
- properties.put("gravitino.uri", "http://127.0.0.1:8090");
- properties.put("gravitino.simplify-catalog-names", "true");
- queryRunner.createCatalog("test0", "gravitino", properties);
- }
-
- {
- // Test failed to create catalog with different metalake
- HashMap<String, String> properties = new HashMap<>();
- properties.put("gravitino.metalake", "test1");
- properties.put("gravitino.uri", "http://127.0.0.1:8090");
- properties.put("gravitino.simplify-catalog-names", "true");
- try {
- queryRunner.createCatalog("test1", "gravitino", properties);
- } catch (Exception e) {
- assertThat(e.getMessage()).contains("Multiple metalakes are not
supported");
- }
- }
-
- server.close();
- }
-
- @Test
- public void testCreateConnectorsWithDisableSimpleCatalog() throws Exception {
- server = new GravitinoMockServer();
- Session session = testSessionBuilder().setCatalog("gravitino").build();
- QueryRunner queryRunner =
DistributedQueryRunner.builder(session).setNodeCount(1).build();
-
- GravitinoAdminClient gravitinoClient = server.createGravitinoClient();
- TestGravitinoPlugin gravitinoPlugin = new
TestGravitinoPlugin(gravitinoClient);
- queryRunner.installPlugin(gravitinoPlugin);
-
- // test create two connector and set gravitino.simplify-catalog-names =
false
- {
- // create a gravitino connector named gravitino using metalake test
- HashMap<String, String> properties = new HashMap<>();
- properties.put("gravitino.metalake", "test");
- properties.put("gravitino.uri", "http://127.0.0.1:8090");
- properties.put("gravitino.simplify-catalog-names", "false");
- queryRunner.createCatalog("test0", "gravitino", properties);
- }
-
- {
- // Test failed to create catalog with different metalake
- HashMap<String, String> properties = new HashMap<>();
- properties.put("gravitino.metalake", "test1");
- properties.put("gravitino.uri", "http://127.0.0.1:8090");
- properties.put("gravitino.simplify-catalog-names", "false");
- queryRunner.createCatalog("test1", "gravitino", properties);
- }
-
- server.close();
- }
-}
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConfig.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConfig.java
index e4a8c9e5fe..f79fa1bfe2 100644
---
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConfig.java
+++
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConfig.java
@@ -28,18 +28,10 @@ import com.google.common.collect.ImmutableMap;
import io.trino.spi.TrinoException;
import java.util.Map;
import java.util.regex.Pattern;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.jupiter.api.Test;
public class TestGravitinoConfig {
- @BeforeClass
- public static void startup() throws Exception {}
-
- @AfterClass
- public static void shutdown() throws Exception {}
-
@Test
public void testGravitinoConfig() {
String gravitinoUrl = "http://127.0.0.1:8000";
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnector.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnector.java
index 30ac739009..99a7f803ac 100644
---
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnector.java
+++
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnector.java
@@ -18,72 +18,37 @@
*/
package org.apache.gravitino.trino.connector;
-import static io.trino.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.google.common.base.Preconditions;
-import io.trino.Session;
-import io.trino.plugin.memory.MemoryPlugin;
-import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.MaterializedResult;
import io.trino.testing.MaterializedRow;
-import io.trino.testing.QueryRunner;
import java.util.HashMap;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.gravitino.client.GravitinoAdminClient;
-import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager;
-import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
-public class TestGravitinoConnector extends AbstractTestQueryFramework {
+public abstract class TestGravitinoConnector extends
AbstractGravitinoConnectorTest {
- GravitinoMockServer server;
+ public static final int SPI_VERSION_TEST_SUPPORT_RENAME_COLUMN = 452;
+ public static final int SPI_VERSION_SUPPORT_ADD_COLUMN_WITH_POSITION = 468;
+ public static final int SPI_VERSION_TEST_SUPPORT_ADD_COLUMN = 469;
@Override
- protected QueryRunner createQueryRunner() throws Exception {
- server = closeAfterClass(new GravitinoMockServer());
- GravitinoAdminClient gravitinoClient = server.createGravitinoClient();
-
- Session session = testSessionBuilder().setCatalog("gravitino").build();
- try {
-
- DistributedQueryRunner queryRunner =
- DistributedQueryRunner.builder(session).setNodeCount(1).build();
-
- TestGravitinoPlugin gravitinoPlugin = new
TestGravitinoPlugin(gravitinoClient);
- queryRunner.installPlugin(gravitinoPlugin);
-
- // create a gravitino connector named gravitino using metalake test
- HashMap<String, String> properties = new HashMap<>();
- properties.put("gravitino.metalake", "test");
- properties.put("gravitino.uri", "http://127.0.0.1:8090");
- properties.put(
- "catalog.config-dir",
queryRunner.getCoordinator().getBaseDataDir().toString());
- properties.put("discovery.uri",
queryRunner.getCoordinator().getBaseUrl().toString());
- queryRunner.createCatalog("gravitino", "gravitino", properties);
-
-
GravitinoConnectorPluginManager.instance(this.getClass().getClassLoader())
- .installPlugin("memory", new MemoryPlugin());
- CatalogConnectorManager catalogConnectorManager =
- gravitinoPlugin.getCatalogConnectorManager();
- server.setCatalogConnectorManager(catalogConnectorManager);
-
- // Wait for the catalog to be created. Wait for at least 30 seconds.
- Awaitility.await()
- .atMost(30, TimeUnit.SECONDS)
- .pollInterval(1, TimeUnit.SECONDS)
- .until(() -> !catalogConnectorManager.getCatalogs().isEmpty());
-
- return queryRunner;
- } catch (Exception e) {
- throw new RuntimeException("Create query runner failed", e);
- }
+ protected void configureCatalogs(
+ DistributedQueryRunner queryRunner, GravitinoAdminClient
gravitinoClient) {
+ // create a gravitino connector named gravitino using metalake test
+ HashMap<String, String> properties = new HashMap<>();
+ properties.put("gravitino.metalake", "test");
+ properties.put("gravitino.uri", "http://127.0.0.1:8090");
+ properties.put("catalog.config-dir",
queryRunner.getCoordinator().getBaseDataDir().toString());
+ properties.put("discovery.uri",
queryRunner.getCoordinator().getBaseUrl().toString());
+ queryRunner.createCatalog("gravitino", "gravitino", properties);
}
@Test
@@ -189,38 +154,51 @@ public class TestGravitinoConnector extends
AbstractTestQueryFramework {
createTestTable(fullTableName1);
- // test add column and drop column, but the memory connector is not
supported these operations.
- assertQueryFails(
- String.format("alter table %s add column if not exists c varchar",
fullTableName1),
- format("This connector does not support adding columns"));
-
- assertQueryFails(
- String.format("alter table %s drop column a", fullTableName1),
- format("This connector does not support dropping columns"));
-
// test set table comment
assertUpdate(String.format("comment on table %s is 'test table comments'",
fullTableName1));
assertThat((String) computeScalar("show create table " + fullTableName1))
.contains("COMMENT 'test table comments'");
- // test rename column, but the memory connector is not supported these
operations.
- assertQueryFails(
- String.format("alter table %s rename column a to c ", fullTableName1),
- format("This connector does not support renaming columns"));
-
- assertQueryFails(
- String.format("alter table %s alter column a set DATA TYPE int",
fullTableName1),
- format("This connector does not support setting column types"));
-
// test set column comment
assertUpdate(String.format("comment on column %s.a is 'test column
comments'", fullTableName1));
assertThat((String) computeScalar("show create table " + fullTableName1))
.contains("COMMENT 'test column comments'");
+ // test add column and drop column, but the memory connector is not
supported these operations.
+ if (trinoVersion < SPI_VERSION_TEST_SUPPORT_ADD_COLUMN) {
+ assertQueryFails(
+ String.format("alter table %s add column if not exists c varchar",
fullTableName1),
+ "This connector does not support adding columns");
+ } else {
+ assertUpdate(
+ String.format("alter table %s add column if not exists c varchar",
fullTableName1));
+ assertThat((String) computeScalar("show create table " + fullTableName1))
+ .contains("c varchar");
+ }
+
+ assertQueryFails(
+ String.format("alter table %s drop column a", fullTableName1),
+ "This connector does not support dropping columns");
+
+ // test rename column, but the memory connector is not supported these
operations.
+ if (trinoVersion < SPI_VERSION_TEST_SUPPORT_RENAME_COLUMN) {
+ assertQueryFails(
+ String.format("alter table %s rename column b to d ",
fullTableName1),
+ "This connector does not support renaming columns");
+ } else {
+ assertUpdate(String.format("alter table %s rename column b to d ",
fullTableName1));
+ assertThat((String) computeScalar("show create table " + fullTableName1))
+ .contains("d integer");
+ }
+
+ assertQueryFails(
+ String.format("alter table %s alter column a set DATA TYPE int",
fullTableName1),
+ "This connector does not support setting column types");
+
// test set table properties, but the memory connector is not supported
these operations.
assertQueryFails(
String.format("alter table %s set properties \"max_ttl\" = 20",
fullTableName1),
- format("This connector does not support setting table properties"));
+ "This connector does not support setting table properties");
dropTestTable(fullTableName1);
}
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorFactory.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorFactory.java
deleted file mode 100644
index 8b802d53b0..0000000000
---
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.trino.connector;
-
-import java.util.function.Supplier;
-import org.apache.gravitino.client.GravitinoAdminClient;
-
-public class TestGravitinoConnectorFactory extends GravitinoConnectorFactory {
- private GravitinoAdminClient gravitinoClient;
-
- public void setGravitinoClient(GravitinoAdminClient client) {
- this.gravitinoClient = client;
- }
-
- @Override
- Supplier<GravitinoAdminClient> clientProvider() {
- return () -> gravitinoClient;
- }
-}
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorNullChecks.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorNullChecks.java
index bf69565b88..1c1bbcb8b6 100644
---
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorNullChecks.java
+++
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorNullChecks.java
@@ -26,16 +26,27 @@ import static org.mockito.Mockito.when;
import io.trino.spi.connector.Connector;
import io.trino.spi.transaction.IsolationLevel;
+import org.apache.gravitino.Catalog;
import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.SupportsSchemas;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.rel.TableCatalog;
import org.apache.gravitino.trino.connector.catalog.CatalogConnectorContext;
+import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog;
import org.junit.jupiter.api.Test;
class TestGravitinoConnectorNullChecks {
@Test
void testBeginTransactionThrowsIfInternalConnectorIsNull() {
+ GravitinoCatalog mockCatalog = mock(GravitinoCatalog.class);
+
when(mockCatalog.geNameIdentifier()).thenReturn(NameIdentifier.of("metalake",
"catalog"));
+
CatalogConnectorContext mockContext = mock(CatalogConnectorContext.class);
+ when(mockContext.getCatalog()).thenReturn(mockCatalog);
+ GravitinoMetalake metalake = mockMetalake();
+ when(mockContext.getMetalake()).thenReturn(metalake);
when(mockContext.getInternalConnector()).thenReturn(null);
- GravitinoConnector connector = new
GravitinoConnector(mock(NameIdentifier.class), mockContext);
+ GravitinoConnector connector = new GravitinoConnector(mockContext);
assertThrows(
IllegalArgumentException.class,
() -> connector.beginTransaction(IsolationLevel.READ_COMMITTED, true,
true));
@@ -43,14 +54,30 @@ class TestGravitinoConnectorNullChecks {
@Test
void testBeginTransactionThrowsIfInternalTransactionHandleIsNull() {
+ GravitinoCatalog mockCatalog = mock(GravitinoCatalog.class);
+
when(mockCatalog.geNameIdentifier()).thenReturn(NameIdentifier.of("metalake",
"catalog"));
+
CatalogConnectorContext mockContext = mock(CatalogConnectorContext.class);
+ when(mockContext.getCatalog()).thenReturn(mockCatalog);
+ GravitinoMetalake metalake = mockMetalake();
+ when(mockContext.getMetalake()).thenReturn(metalake);
+
Connector mockInternalConnector = mock(Connector.class);
when(mockContext.getInternalConnector()).thenReturn(mockInternalConnector);
when(mockInternalConnector.beginTransaction(any(), anyBoolean(),
anyBoolean()))
.thenReturn(null);
- GravitinoConnector connector = new
GravitinoConnector(mock(NameIdentifier.class), mockContext);
+ GravitinoConnector connector = new GravitinoConnector(mockContext);
assertThrows(
IllegalArgumentException.class,
() -> connector.beginTransaction(IsolationLevel.READ_COMMITTED, true,
true));
}
+
+ private static GravitinoMetalake mockMetalake() {
+ GravitinoMetalake metalake = mock(GravitinoMetalake.class);
+ Catalog catalog = mock(Catalog.class);
+ when(catalog.asSchemas()).thenReturn(mock(SupportsSchemas.class));
+ when(catalog.asTableCatalog()).thenReturn(mock(TableCatalog.class));
+ when(metalake.loadCatalog(any())).thenReturn(catalog);
+ return metalake;
+ }
}
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithMetalakeCatalogName.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithMetalakeCatalogName.java
index c60c5910b8..b1cef8e4cc 100644
---
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithMetalakeCatalogName.java
+++
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithMetalakeCatalogName.java
@@ -18,91 +18,43 @@
*/
package org.apache.gravitino.trino.connector;
-import static io.trino.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
-import io.trino.Session;
-import io.trino.plugin.memory.MemoryPlugin;
-import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.MaterializedResult;
import io.trino.testing.MaterializedRow;
-import io.trino.testing.QueryRunner;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import org.apache.gravitino.client.GravitinoAdminClient;
-import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager;
-import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
-@Disabled
-public class TestGravitinoConnectorWithMetalakeCatalogName extends
AbstractTestQueryFramework {
-
- GravitinoMockServer server;
+public abstract class TestGravitinoConnectorWithMetalakeCatalogName
+ extends AbstractGravitinoConnectorTest {
@Override
- protected QueryRunner createQueryRunner() throws Exception {
- server = closeAfterClass(new GravitinoMockServer());
- GravitinoAdminClient gravitinoClient = server.createGravitinoClient();
-
- Session session = testSessionBuilder().setCatalog("gravitino").build();
-
- try {
- DistributedQueryRunner queryRunner =
- DistributedQueryRunner.builder(session).setNodeCount(1).build();
-
- TestGravitinoPlugin gravitinoPlugin = new
TestGravitinoPlugin(gravitinoClient);
- queryRunner.installPlugin(gravitinoPlugin);
-
- {
- // create a gravitino connector named gravitino using metalake test
- HashMap<String, String> properties = new HashMap<>();
- properties.put("gravitino.metalake", "test");
- properties.put("gravitino.uri", "http://127.0.0.1:8090");
- properties.put("gravitino.simplify-catalog-names", "false");
- properties.put(
- "trino.catalog.store",
queryRunner.getCoordinator().getBaseDataDir().toString());
- properties.put(
- "trino.jdbc.uri",
-
queryRunner.getCoordinator().getBaseUrl().toString().replace("http",
"jdbc:trino"));
- queryRunner.createCatalog("gravitino", "gravitino", properties);
- }
-
- {
- // create a gravitino connector named test1 using metalake gravitino1
- HashMap<String, String> properties = new HashMap<>();
- properties.put("gravitino.metalake", "test1");
- properties.put("gravitino.uri", "http://127.0.0.1:8090");
- properties.put("gravitino.simplify-catalog-names", "false");
- properties.put(
- "trino.catalog.store",
queryRunner.getCoordinator().getBaseDataDir().toString());
- properties.put(
- "trino.jdbc.uri",
-
queryRunner.getCoordinator().getBaseUrl().toString().replace("http",
"jdbc:trino"));
- queryRunner.createCatalog("gravitino1", "gravitino", properties);
- }
-
-
GravitinoConnectorPluginManager.instance(this.getClass().getClassLoader())
- .installPlugin("memory", new MemoryPlugin());
- CatalogConnectorManager catalogConnectorManager =
- gravitinoPlugin.getCatalogConnectorManager();
- server.setCatalogConnectorManager(catalogConnectorManager);
-
- // Wait for the catalog to be created. Wait for at least 30 seconds.
- Awaitility.await()
- .atMost(30, TimeUnit.SECONDS)
- .pollInterval(1, TimeUnit.SECONDS)
- .until(() -> !catalogConnectorManager.getCatalogs().isEmpty());
- return queryRunner;
-
- } catch (Exception e) {
- throw new RuntimeException("Create query runner failed", e);
- }
+ protected void configureCatalogs(
+ DistributedQueryRunner queryRunner, GravitinoAdminClient
gravitinoClient) {
+ // create a gravitino connector named gravitino using metalake test
+ HashMap<String, String> properties = new HashMap<>();
+ properties.put("gravitino.metalake", "test");
+ properties.put("gravitino.uri", "http://127.0.0.1:8090");
+ properties.put("gravitino.use-single-metalake", "false");
+ properties.put("catalog.config-dir",
queryRunner.getCoordinator().getBaseDataDir().toString());
+ properties.put("discovery.uri",
queryRunner.getCoordinator().getBaseUrl().toString());
+ queryRunner.createCatalog("gravitino", "gravitino", properties);
+
+ // create a gravitino connector named test1 using metalake gravitino1
+ HashMap<String, String> secondProperties = new HashMap<>();
+ secondProperties.put("gravitino.metalake", "test1");
+ secondProperties.put("gravitino.uri", "http://127.0.0.1:8090");
+ secondProperties.put("gravitino.use-single-metalake", "false");
+ secondProperties.put(
+ "catalog.config-dir",
queryRunner.getCoordinator().getBaseDataDir().toString());
+ secondProperties.put("discovery.uri",
queryRunner.getCoordinator().getBaseUrl().toString());
+ queryRunner.createCatalog("gravitino1", "gravitino", secondProperties);
}
@Test
@@ -121,42 +73,49 @@ public class TestGravitinoConnectorWithMetalakeCatalogName
extends AbstractTestQ
// testing the catalogs
assertThat(computeActual("show
catalogs").getOnlyColumnAsSet()).contains("gravitino");
assertThat(computeActual("show
catalogs").getOnlyColumnAsSet()).contains("gravitino1");
- assertThat(computeActual("show
catalogs").getOnlyColumnAsSet()).contains("\"test.memory\"");
+ assertThat(computeActual("show catalogs").getOnlyColumnAsSet())
+ .contains(getTrinoCliCatalogName("test", "memory"));
// testing the gravitino connector framework works.
assertThat(computeActual("select * from
system.jdbc.tables").getRowCount()).isGreaterThan(1);
// test metalake named test. the connector name is gravitino
assertUpdate("call gravitino.system.create_catalog('memory1', 'memory',
Map())");
- assertThat(computeActual("show
catalogs").getOnlyColumnAsSet()).contains("\"test.memory1\"");
+ assertThat(computeActual("show catalogs").getOnlyColumnAsSet())
+ .contains(getTrinoCliCatalogName("test", "memory1"));
assertUpdate("call gravitino.system.drop_catalog('memory1')");
assertThat(computeActual("show catalogs").getOnlyColumnAsSet())
- .doesNotContain("\"test.memory1\"");
+ .doesNotContain(getTrinoCliCatalogName("test", "memory1"));
assertUpdate(
"call gravitino.system.create_catalog("
+ "catalog=>'memory1', provider=>'memory', properties =>
Map(array['max_ttl'], array['10']), ignore_exist => true)");
- assertThat(computeActual("show
catalogs").getOnlyColumnAsSet()).contains("\"test.memory1\"");
+ assertThat(computeActual("show catalogs").getOnlyColumnAsSet())
+ .contains(getTrinoCliCatalogName("test", "memory1"));
assertUpdate(
"call gravitino.system.drop_catalog(catalog => 'memory1',
ignore_not_exist => true)");
assertThat(computeActual("show catalogs").getOnlyColumnAsSet())
- .doesNotContain("\"test.memory1\"");
+ .doesNotContain(getTrinoCliCatalogName("test", "memory1"));
// test metalake named test1. the connector name is gravitino1
GravitinoAdminClient gravitinoClient = server.createGravitinoClient();
gravitinoClient.createMetalake("test1", "", Collections.emptyMap());
assertUpdate("call gravitino1.system.create_catalog('memory1', 'memory',
Map())");
- assertThat(computeActual("show
catalogs").getOnlyColumnAsSet()).contains("\"test1.memory1\"");
+ assertThat(computeActual("show catalogs").getOnlyColumnAsSet())
+ .contains(getTrinoCliCatalogName("test1", "memory1"));
assertUpdate("call gravitino1.system.drop_catalog('memory1')");
assertThat(computeActual("show catalogs").getOnlyColumnAsSet())
- .doesNotContain("\"test1.memory1\"");
+ .doesNotContain(getTrinoCliCatalogName("test1", "memory1"));
}
@Test
public void testCreateTable() {
- String fullSchemaName = "\"test.memory\".db_01";
+ assertThat(computeActual("show catalogs").getOnlyColumnAsSet())
+ .contains(getTrinoCliCatalogName("test", "memory"));
+
+ String fullSchemaName = getTrinoSqlCatalogName("test", "memory") +
".db_01";
String tableName = "tb_01";
String fullTableName = fullSchemaName + "." + tableName;
@@ -178,4 +137,12 @@ public class TestGravitinoConnectorWithMetalakeCatalogName
extends AbstractTestQ
assertUpdate("drop table " + fullTableName);
assertUpdate("drop schema " + fullSchemaName);
}
+
+ protected String getTrinoCliCatalogName(String metalakeName, String
catalogName) {
+ return "\"" + metalakeName + "." + catalogName + "\"";
+ }
+
+ protected String getTrinoSqlCatalogName(String metalakeName, String
catalogName) {
+ return "\"\"\"" + metalakeName + "." + catalogName + "\"\"\"";
+ }
}
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithSkipCatalog.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithSkipCatalog.java
deleted file mode 100644
index c3ad8f63c4..0000000000
---
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorWithSkipCatalog.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.trino.connector;
-
-import static io.trino.testing.TestingSession.testSessionBuilder;
-import static org.apache.gravitino.Catalog.Type.RELATIONAL;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import io.trino.Session;
-import io.trino.plugin.memory.MemoryPlugin;
-import io.trino.testing.AbstractTestQueryFramework;
-import io.trino.testing.DistributedQueryRunner;
-import io.trino.testing.MaterializedResult;
-import io.trino.testing.QueryRunner;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import org.apache.gravitino.client.GravitinoAdminClient;
-import org.apache.gravitino.client.GravitinoMetalake;
-import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager;
-import org.awaitility.Awaitility;
-import org.junit.jupiter.api.Test;
-
-public class TestGravitinoConnectorWithSkipCatalog extends
AbstractTestQueryFramework {
-
- GravitinoMockServer server;
-
- @Override
- protected QueryRunner createQueryRunner() throws Exception {
- GravitinoAdminClient gravitinoClient = initGravitinoMockServer();
- Session session = testSessionBuilder().setCatalog("gravitino").build();
-
- try {
- DistributedQueryRunner queryRunner =
- DistributedQueryRunner.builder(session).setNodeCount(1).build();
-
- TestGravitinoPlugin gravitinoPlugin = new
TestGravitinoPlugin(gravitinoClient);
- queryRunner.installPlugin(gravitinoPlugin);
-
- {
- // create a gravitino connector with single metalake
- HashMap<String, String> properties = new HashMap<>();
- properties.put("gravitino.metalake", "test1");
- properties.put("gravitino.uri", "http://127.0.0.1:8090");
- properties.put("gravitino.trino.skip-catalog-patterns", "a.*, b1");
- properties.put(
- "catalog.config-dir",
queryRunner.getCoordinator().getBaseDataDir().toString());
- properties.put("discovery.uri",
queryRunner.getCoordinator().getBaseUrl().toString());
- queryRunner.createCatalog("gravitino", "gravitino", properties);
- }
-
-
GravitinoConnectorPluginManager.instance(this.getClass().getClassLoader())
- .installPlugin("memory", new MemoryPlugin());
- CatalogConnectorManager catalogConnectorManager =
- gravitinoPlugin.getCatalogConnectorManager();
- server.setCatalogConnectorManager(catalogConnectorManager);
-
- // Wait for the catalog to be created. Wait for at least 30 seconds.
- Awaitility.await()
- .atMost(30, TimeUnit.SECONDS)
- .pollInterval(1, TimeUnit.SECONDS)
- .until(() -> !catalogConnectorManager.getCatalogs().isEmpty());
- return queryRunner;
-
- } catch (Exception e) {
- throw new RuntimeException("Create query runner failed", e);
- }
- }
-
- @Test
- public void testShowCatalogsFilteredBySkipPatterns() throws Exception {
- MaterializedResult expectedResult = computeActual("show catalogs");
- assertEquals(expectedResult.getMaterializedRows().size(), 3);
- List<String> catalogs =
- expectedResult.getMaterializedRows().stream()
- .map(row -> (String) row.getField(0))
- .collect(Collectors.toList());
- assertFalse(catalogs.contains("a1"));
- assertFalse(catalogs.contains("b1"));
- assertTrue(catalogs.contains("b2"));
- }
-
- @Test
- public void testSystemTableQueryFilteredBySkipPatterns() throws Exception {
- MaterializedResult expectedResult = computeActual("select * from
gravitino.system.catalog");
- assertEquals(expectedResult.getMaterializedRows().size(), 1);
- List<String> catalogs =
- expectedResult.getMaterializedRows().stream()
- .map(row -> (String) row.getField(0))
- .collect(Collectors.toList());
- assertFalse(catalogs.contains("a1"));
- assertFalse(catalogs.contains("b1"));
- assertTrue(catalogs.contains("b2"));
- }
-
- private GravitinoAdminClient initGravitinoMockServer() {
- GravitinoMockServer gravitinoMockServer = new GravitinoMockServer();
- server = closeAfterClass(gravitinoMockServer);
- GravitinoAdminClient gravitinoClient = server.createGravitinoClient();
-
- gravitinoClient.createMetalake("test1", "", Collections.emptyMap());
- GravitinoMetalake metalake = gravitinoClient.loadMetalake("test1");
- metalake.createCatalog("a1", RELATIONAL, "", "", Collections.emptyMap());
- metalake.createCatalog("b1", RELATIONAL, "", "", Collections.emptyMap());
- metalake.createCatalog("b2", RELATIONAL, "", "", Collections.emptyMap());
- return gravitinoClient;
- }
-}
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoMetadataGetSystemTable.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoMetadataGetSystemTable.java
index cfd846b05b..e3c4403336 100644
---
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoMetadataGetSystemTable.java
+++
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoMetadataGetSystemTable.java
@@ -51,7 +51,7 @@ public class TestGravitinoMetadataGetSystemTable {
.thenReturn(Optional.of(mockSystemTable));
GravitinoMetadata gravitinoMetadata =
- new GravitinoMetadata(catalogConnectorMetadata, metadataAdapter,
internalMetadata);
+ new TestGravitinoMetadata(catalogConnectorMetadata, metadataAdapter,
internalMetadata);
Optional<SystemTable> result = gravitinoMetadata.getSystemTable(session,
tableName);
@@ -73,11 +73,20 @@ public class TestGravitinoMetadataGetSystemTable {
.thenReturn(Optional.empty());
GravitinoMetadata gravitinoMetadata =
- new GravitinoMetadata(catalogConnectorMetadata, metadataAdapter,
internalMetadata);
+ new TestGravitinoMetadata(catalogConnectorMetadata, metadataAdapter,
internalMetadata);
Optional<SystemTable> result = gravitinoMetadata.getSystemTable(session,
tableName);
assertFalse(result.isPresent());
verify(internalMetadata).getSystemTable(session, tableName);
}
+
+ private static final class TestGravitinoMetadata extends GravitinoMetadata {
+ private TestGravitinoMetadata(
+ CatalogConnectorMetadata catalogConnectorMetadata,
+ CatalogConnectorMetadataAdapter metadataAdapter,
+ ConnectorMetadata internalMetadata) {
+ super(catalogConnectorMetadata, metadataAdapter, internalMetadata);
+ }
+ }
}
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoPlugin.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoPlugin.java
deleted file mode 100644
index 044deb527f..0000000000
---
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoPlugin.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.trino.connector;
-
-import com.google.common.collect.ImmutableList;
-import io.trino.spi.connector.ConnectorFactory;
-import org.apache.gravitino.client.GravitinoAdminClient;
-import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager;
-
-public class TestGravitinoPlugin extends GravitinoPlugin {
- private TestGravitinoConnectorFactory factory;
-
- private final GravitinoAdminClient gravitinoClient;
-
- public TestGravitinoPlugin(GravitinoAdminClient gravitinoClient) {
- this.gravitinoClient = gravitinoClient;
- }
-
- @Override
- public Iterable<ConnectorFactory> getConnectorFactories() {
- factory = new TestGravitinoConnectorFactory();
- factory.setGravitinoClient(gravitinoClient);
- return ImmutableList.of(factory);
- }
-
- public CatalogConnectorManager getCatalogConnectorManager() {
- return factory.getCatalogConnectorManager();
- }
-}
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/TestCatalogConnectorManager.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/TestCatalogConnectorManager.java
new file mode 100644
index 0000000000..f03cf0a474
--- /dev/null
+++
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/TestCatalogConnectorManager.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.trino.connector.catalog;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableMap;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ConnectorContext;
+import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.trino.connector.GravitinoConfig;
+import org.apache.gravitino.trino.connector.GravitinoErrorCode;
+import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog;
+import org.junit.jupiter.api.Test;
+
+public class TestCatalogConnectorManager {
+
+ @Test
+ public void testSingleMetalakeCatalogNaming() throws Exception {
+ CatalogConnectorManager manager =
+ createManager(
+ ImmutableMap.of(
+ "gravitino.uri",
+ "http://127.0.0.1:8090",
+ "gravitino.metalake",
+ "test",
+ "gravitino.use-single-metalake",
+ "true"));
+
+ assertEquals("memory", manager.getTrinoCatalogName("test", "memory"));
+ }
+
+ @Test
+ public void testMultiMetalakeCatalogNaming() throws Exception {
+ CatalogConnectorManager manager =
+ createManager(
+ ImmutableMap.of(
+ "gravitino.uri",
+ "http://127.0.0.1:8090",
+ "gravitino.metalake",
+ "test",
+ "gravitino.use-single-metalake",
+ "false"));
+
+ assertEquals("\"test.memory\"", manager.getTrinoCatalogName("test",
"memory"));
+ }
+
+ @Test
+ public void testSingleMetalakeRejectsDifferentMetalakeConnector() throws
Exception {
+ CatalogConnectorFactory catalogFactory = createCatalogConnectorFactory();
+ CatalogConnectorManager manager =
+ createManager(
+ catalogFactory,
+ ImmutableMap.of(
+ "gravitino.uri",
+ "http://127.0.0.1:8090",
+ "gravitino.metalake",
+ "test",
+ "gravitino.use-single-metalake",
+ "true"));
+
+ GravitinoConfig connectorConfig =
createConnectorConfig(catalogConfigJson("test", "memory"));
+ assertDoesNotThrow(
+ () -> manager.createCatalogConnectorContext("test0", connectorConfig,
mockContext()));
+
+ GravitinoConfig otherConnectorConfig =
+ createConnectorConfig(catalogConfigJson("test2", "memory"));
+ TrinoException error =
+ assertThrows(
+ TrinoException.class,
+ () ->
+ manager.createCatalogConnectorContext(
+ "test1", otherConnectorConfig, mockContext()));
+ assertEquals(GravitinoErrorCode.GRAVITINO_OPERATION_FAILED.toErrorCode(),
error.getErrorCode());
+ assertTrue(error.getMessage().contains("Failed to create connector"));
+ assertTrue(error.getCause().getMessage().contains("Multiple metalakes are
not supported"));
+ }
+
+ @Test
+ public void testMultiMetalakeAllowsDifferentMetalakeConnector() throws
Exception {
+ CatalogConnectorFactory catalogFactory = createCatalogConnectorFactory();
+ CatalogConnectorManager manager =
+ createManager(
+ catalogFactory,
+ ImmutableMap.of(
+ "gravitino.uri",
+ "http://127.0.0.1:8090",
+ "gravitino.metalake",
+ "test",
+ "gravitino.use-single-metalake",
+ "false"));
+
+ GravitinoConfig connectorConfig =
createConnectorConfig(catalogConfigJson("test", "memory"));
+ assertDoesNotThrow(
+ () -> manager.createCatalogConnectorContext("test0", connectorConfig,
mockContext()));
+
+ GravitinoConfig otherConnectorConfig =
+ createConnectorConfig(catalogConfigJson("test2", "memory"));
+ assertDoesNotThrow(
+ () -> manager.createCatalogConnectorContext("test1",
otherConnectorConfig, mockContext()));
+ }
+
+ @Test
+ public void testSkipCatalogPatterns() throws Exception {
+ CatalogConnectorManager manager =
+ createManager(
+ ImmutableMap.of(
+ "gravitino.uri",
+ "http://127.0.0.1:8090",
+ "gravitino.metalake",
+ "test1",
+ "gravitino.trino.skip-catalog-patterns",
+ "a.*, b1"));
+
+ assertTrue(manager.skipCatalog("a1"));
+ assertTrue(manager.skipCatalog("b1"));
+ assertFalse(manager.skipCatalog("b2"));
+ }
+
+ private CatalogConnectorManager createManager(ImmutableMap<String, String>
configMap)
+ throws Exception {
+ return createManager(createCatalogConnectorFactory(), configMap);
+ }
+
+ private CatalogConnectorManager createManager(
+ CatalogConnectorFactory catalogFactory, ImmutableMap<String, String>
configMap) {
+ CatalogRegister catalogRegister = mock(CatalogRegister.class);
+
+ boolean singleMetalakeMode =
+ configMap.getOrDefault("gravitino.use-single-metalake",
"true").equals("true");
+ CatalogConnectorManager manager =
+ new CatalogConnectorManager(
+ catalogRegister,
+ catalogFactory,
+ singleMetalakeMode
+ ? null
+ : (metalake, catalog) -> String.format("\"%s.%s\"", metalake,
catalog));
+ manager.config(new GravitinoConfig(configMap),
mock(GravitinoAdminClient.class));
+ return manager;
+ }
+
+ private CatalogConnectorFactory createCatalogConnectorFactory() throws
Exception {
+ CatalogConnectorFactory catalogFactory =
mock(CatalogConnectorFactory.class);
+ CatalogConnectorContext.Builder builder =
mock(CatalogConnectorContext.Builder.class);
+
when(catalogFactory.createCatalogConnectorContextBuilder(any())).thenReturn(builder);
+ when(builder.withMetalake(any())).thenReturn(builder);
+ when(builder.withContext(any())).thenReturn(builder);
+ when(builder.build()).thenReturn(mock(CatalogConnectorContext.class));
+ return catalogFactory;
+ }
+
+ private static GravitinoConfig createConnectorConfig(String
catalogConfigJson) {
+ return new GravitinoConfig(
+ ImmutableMap.of(
+ "gravitino.uri",
+ "http://127.0.0.1:8090",
+ "gravitino.metalake",
+ "test",
+ GravitinoConfig.GRAVITINO_DYNAMIC_CONNECTOR,
+ "true",
+ GravitinoConfig.GRAVITINO_DYNAMIC_CONNECTOR_CATALOG_CONFIG,
+ catalogConfigJson));
+ }
+
+ private static String catalogConfigJson(String metalake, String name) throws
Exception {
+ GravitinoCatalog catalog =
+ new GravitinoCatalog(metalake, "memory", name, ImmutableMap.of(), 0L);
+ return GravitinoCatalog.toJson(catalog);
+ }
+
+ private static ConnectorContext mockContext() {
+ return mock(ConnectorContext.class);
+ }
+}
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/hive/TestHiveDataTypeConverter.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/hive/TestHiveDataTypeConverter.java
index 7bf5c49048..315d3c8a9a 100644
---
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/hive/TestHiveDataTypeConverter.java
+++
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/hive/TestHiveDataTypeConverter.java
@@ -22,7 +22,6 @@ package org.apache.gravitino.trino.connector.catalog.hive;
import io.trino.spi.TrinoException;
import org.apache.gravitino.rel.types.Types;
import org.apache.gravitino.trino.connector.util.GeneralDataTypeTransformer;
-import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -43,7 +42,7 @@ public class TestHiveDataTypeConverter {
io.trino.spi.type.Type charLengthIsOverflow =
io.trino.spi.type.CharType.createCharType(256);
Exception e =
- Assert.assertThrows(
+ Assertions.assertThrows(
TrinoException.class,
() ->
generalDataTypeTransformer.getGravitinoType(charLengthIsOverflow));
Assertions.assertTrue(
@@ -63,7 +62,7 @@ public class TestHiveDataTypeConverter {
io.trino.spi.type.Type varcharLengthIsOverflow =
io.trino.spi.type.VarcharType.createVarcharType(65536);
e =
- Assert.assertThrows(
+ Assertions.assertThrows(
TrinoException.class,
() ->
generalDataTypeTransformer.getGravitinoType(varcharLengthIsOverflow));
Assertions.assertTrue(
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/jdbc/postgresql/TestPostgreSQLDataTypeTransformer.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/jdbc/postgresql/TestPostgreSQLDataTypeTransformer.java
index 9afe765eb0..831fce11f7 100644
---
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/jdbc/postgresql/TestPostgreSQLDataTypeTransformer.java
+++
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/catalog/jdbc/postgresql/TestPostgreSQLDataTypeTransformer.java
@@ -22,7 +22,7 @@ package
org.apache.gravitino.trino.connector.catalog.jdbc.postgresql;
import org.apache.gravitino.rel.types.Type;
import org.apache.gravitino.rel.types.Types;
import org.apache.gravitino.trino.connector.util.GeneralDataTypeTransformer;
-import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class TestPostgreSQLDataTypeTransformer {
@@ -31,29 +31,29 @@ public class TestPostgreSQLDataTypeTransformer {
public void testTrinoTypeToGravitinoType() {
GeneralDataTypeTransformer generalDataTypeTransformer = new
PostgreSQLDataTypeTransformer();
io.trino.spi.type.Type charTypeWithLengthOne =
io.trino.spi.type.CharType.createCharType(1);
- Assert.assertEquals(
+ Assertions.assertEquals(
generalDataTypeTransformer.getGravitinoType(charTypeWithLengthOne),
Types.FixedCharType.of(1));
io.trino.spi.type.Type charTypeWithLength =
io.trino.spi.type.CharType.createCharType(65536);
- Assert.assertEquals(
+ Assertions.assertEquals(
generalDataTypeTransformer.getGravitinoType(charTypeWithLength),
Types.FixedCharType.of(65536));
io.trino.spi.type.Type varcharType =
io.trino.spi.type.VarcharType.createVarcharType(1);
- Assert.assertEquals(
+ Assertions.assertEquals(
generalDataTypeTransformer.getGravitinoType(varcharType),
Types.VarCharType.of(1));
io.trino.spi.type.Type varcharTypeWithLength =
io.trino.spi.type.VarcharType.createVarcharType(65536);
- Assert.assertEquals(
+ Assertions.assertEquals(
generalDataTypeTransformer.getGravitinoType(varcharTypeWithLength),
Types.VarCharType.of(65536));
io.trino.spi.type.Type varcharTypeWithLength2 =
io.trino.spi.type.VarcharType.createUnboundedVarcharType();
- Assert.assertEquals(
+ Assertions.assertEquals(
generalDataTypeTransformer.getGravitinoType(varcharTypeWithLength2),
Types.StringType.get());
}
@@ -62,7 +62,7 @@ public class TestPostgreSQLDataTypeTransformer {
public void testGravitinoCharToTrinoType() {
GeneralDataTypeTransformer generalDataTypeTransformer = new
PostgreSQLDataTypeTransformer();
Type stringType = Types.StringType.get();
- Assert.assertEquals(
+ Assertions.assertEquals(
generalDataTypeTransformer.getTrinoType(stringType),
io.trino.spi.type.VarcharType.createUnboundedVarcharType());
}
diff --git
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/system/TestGravitinoSystemConnector.java
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/system/TestGravitinoSystemConnector.java
index 372b9f2b88..11b5538b08 100644
---
a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/system/TestGravitinoSystemConnector.java
+++
b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/system/TestGravitinoSystemConnector.java
@@ -21,18 +21,23 @@ package org.apache.gravitino.trino.connector.system;
import io.trino.spi.Page;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
public class TestGravitinoSystemConnector {
@Test
public void testSystemTablePageSourceReturnsPageOnlyOnce() throws Exception {
Page page = new Page(0);
try (GravitinoSystemConnector.SystemTablePageSource pageSource =
- new GravitinoSystemConnector.SystemTablePageSource(page)) {
+ Mockito.mock(
+ GravitinoSystemConnector.SystemTablePageSource.class,
+ Mockito.withSettings()
+ .useConstructor(page)
+ .defaultAnswer(Mockito.CALLS_REAL_METHODS))) {
Assertions.assertFalse(pageSource.isFinished());
- Assertions.assertSame(page, pageSource.getNextPage());
+ Assertions.assertSame(page, pageSource.nextPage());
Assertions.assertTrue(pageSource.isFinished());
- Assertions.assertNull(pageSource.getNextPage());
+ Assertions.assertNull(pageSource.nextPage());
}
}
@@ -40,18 +45,22 @@ public class TestGravitinoSystemConnector {
public void testSystemTablePageSourceMultipleGetNextPageCalls() throws
Exception {
Page page = new Page(0);
try (GravitinoSystemConnector.SystemTablePageSource pageSource =
- new GravitinoSystemConnector.SystemTablePageSource(page)) {
+ Mockito.mock(
+ GravitinoSystemConnector.SystemTablePageSource.class,
+ Mockito.withSettings()
+ .useConstructor(page)
+ .defaultAnswer(Mockito.CALLS_REAL_METHODS))) {
// First call should return the page
- Page firstPage = pageSource.getNextPage();
+ Page firstPage = pageSource.nextPage();
Assertions.assertNotNull(firstPage);
Assertions.assertSame(page, firstPage);
Assertions.assertTrue(pageSource.isFinished());
// Subsequent calls should return null
- Assertions.assertNull(pageSource.getNextPage());
- Assertions.assertNull(pageSource.getNextPage());
- Assertions.assertNull(pageSource.getNextPage());
+ Assertions.assertNull(pageSource.nextPage());
+ Assertions.assertNull(pageSource.nextPage());
+ Assertions.assertNull(pageSource.nextPage());
Assertions.assertTrue(pageSource.isFinished());
}
}