This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new c98c32c7c6 [#7288] feat (trino-connector) Support automatic loading of
multiple metalakes in the Gravitino Trino connector (#7402)
c98c32c7c6 is described below
commit c98c32c7c67b7e1f03924399c2fc804ac36f92c0
Author: Yuhui <[email protected]>
AuthorDate: Tue Jun 17 00:12:46 2025 +0800
[#7288] feat (trino-connector) Support automatic loading of multiple
metalakes in the Gravitino Trino connector (#7402)
### What changes were proposed in this pull request?
Support automatic loading of multiple metalakes in the Gravitino Trino
connector
### Why are the changes needed?
Fix: #7288
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manually
---
docs/trino-connector/configuration.md | 15 +++----
.../gravitino/trino/connector/GravitinoConfig.java | 29 +++++++++++--
.../trino/connector/GravitinoConnectorFactory.java | 6 ---
.../connector/catalog/CatalogConnectorManager.java | 48 +++++++++++-----------
.../trino/connector/catalog/CatalogRegister.java | 12 +++---
5 files changed, 64 insertions(+), 46 deletions(-)
diff --git a/docs/trino-connector/configuration.md
b/docs/trino-connector/configuration.md
index ba2ac18f5f..3c5b1a6858 100644
--- a/docs/trino-connector/configuration.md
+++ b/docs/trino-connector/configuration.md
@@ -5,10 +5,11 @@ keyword: gravitino connector trino
license: "This software is licensed under the Apache License version 2."
---
-| Property | Type | Default Value |
Description
| Required
| Since Version |
-|----------------------------------|---------|-----------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------|
-| connector.name | string | (none) |
The `connector.name` defines the type of Trino connector, this value is always
'gravitino'.
| Yes
| 0.2.0 |
-| gravitino.metalake | string | (none) |
The `gravitino.metalake` defines which metalake in Gravitino server the Trino
connector uses. Trino connector should set it at start, the value of
`gravitino.metalake` needs to be a valid name, Trino connector can detect and
load the metalake with catalogs, schemas and tables once created and keep in
sync. | Yes | 0.2.0 |
-| gravitino.uri | string | http://localhost:8090 |
The `gravitino.uri` defines the connection URL of the Gravitino server, the
default value is `http://localhost:8090`. Trino connector can detect and
connect to Gravitino server once it is ready, no need to start Gravitino server
beforehand.
| No | 0.2.0 |
-| trino.jdbc.user | string | admin |
The jdbc user name of current Trino.
| NO
| 0.5.1 |
-| trino.jdbc.password | string | (none) |
The jdbc password of current Trino.
| NO
| 0.5.1 |
+| Property | Type | Default Value
| Description
|
Required | Since Version |
+|---------------------------------------------|---------|-----------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------|
+| connector.name | string | (none)
| The `connector.name` defines the type of Trino connector, this value is
always 'gravitino'.
|
Yes | 0.2.0 |
+| gravitino.metalake | string | (none)
| The `gravitino.metalake` defines which metalake in Gravitino server the
Trino connector uses. Trino connector should set it at start, the value of
`gravitino.metalake` needs to be a valid name, Trino connector can detect and
load the metalake with catalogs, schemas and tables once created and keep in
sync. | Yes | 0.2.0 |
+| gravitino.uri | string |
http://localhost:8090 | The `gravitino.uri` defines the connection URL of the
Gravitino server, the default value is `http://localhost:8090`. Trino connector
can detect and connect to Gravitino server once it is ready, no need to start
Gravitino server beforehand.
| No | 0.2.0 |
+| trino.jdbc.user | string | admin
| The jdbc user name of current Trino.
| NO
| 0.5.1 |
+| trino.jdbc.password | string | (none)
| The jdbc password of current Trino.
| NO
| 0.5.1 |
+| gravitino.metadata.refresh-interval-seconds | integer | 10
| The `gravitino.metadata.refresh-interval-seconds` defines the interval in
seconds to refresh metadata from Gravitino server, the default value is 10
seconds.
| No | 0.9.0 |
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConfig.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConfig.java
index 5f192fed9b..77bca07ee5 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConfig.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConfig.java
@@ -62,10 +62,20 @@ public class GravitinoConfig {
private static final ConfigEntry GRAVITINO_METALAKE =
new ConfigEntry("gravitino.metalake", "The metalake name for used", "",
true);
+ @Deprecated
+ @SuppressWarnings("UnusedVariable")
private static final ConfigEntry GRAVITINO_SIMPLIFY_CATALOG_NAMES =
new ConfigEntry(
"gravitino.simplify-catalog-names",
- "Omit metalake prefix for catalog names",
+ "Omit metalake prefix for catalog names, is deprecated, use
gravitino.use-single-metalake instead",
+ "true",
+ false);
+
+ private static final ConfigEntry GRAVITINO_SINGLE_METALAKE_MODE =
+ new ConfigEntry(
+ "gravitino.use-single-metalake",
+ "If true, only one metalake is supported in this connector; identify
the catalog by <catalog_name>. "
+ + "If false, multiple metalakes are supported; identify the
catalog by <metalake_name>.<catalog_name>.",
"true",
false);
@@ -89,6 +99,13 @@ public class GravitinoConfig {
private static final ConfigEntry TRINO_JDBC_PASSWORD =
new ConfigEntry("trino.jdbc.password", "The jdbc user password of
Trino", "", false);
+ private static final ConfigEntry GRAVITINO_METADATA_REFRESH_INTERVAL_SECOND =
+ new ConfigEntry(
+ "gravitino.metadata.refresh-interval-seconds",
+ "The interval in seconds to refresh the metadata from Gravitino
server",
+ "10",
+ false);
+
public GravitinoConfig(Map<String, String> requiredConfig) {
config = requiredConfig;
for (Map.Entry<String, ConfigEntry> entry : CONFIG_DEFINITIONS.entrySet())
{
@@ -114,10 +131,10 @@ public class GravitinoConfig {
return config.getOrDefault(GRAVITINO_METALAKE.key,
GRAVITINO_METALAKE.defaultValue);
}
- public boolean simplifyCatalogNames() {
+ public boolean singleMetalakeMode() {
return Boolean.parseBoolean(
config.getOrDefault(
- GRAVITINO_SIMPLIFY_CATALOG_NAMES.key,
GRAVITINO_SIMPLIFY_CATALOG_NAMES.defaultValue));
+ GRAVITINO_SINGLE_METALAKE_MODE.key,
GRAVITINO_SINGLE_METALAKE_MODE.defaultValue));
}
boolean isDynamicConnector() {
@@ -191,6 +208,12 @@ public class GravitinoConfig {
return StringUtils.join(stringList, ',');
}
+ public String getMetadataRefreshIntervalSecond() {
+ return config.getOrDefault(
+ GRAVITINO_METADATA_REFRESH_INTERVAL_SECOND.key,
+ GRAVITINO_METADATA_REFRESH_INTERVAL_SECOND.defaultValue);
+ }
+
static class ConfigEntry {
final String key;
final String description;
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnectorFactory.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnectorFactory.java
index 9eea4ce56f..eeb5831f06 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnectorFactory.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnectorFactory.java
@@ -108,12 +108,6 @@ public class GravitinoConnectorFactory implements
ConnectorFactory {
throw new TrinoException(
GravitinoErrorCode.GRAVITINO_METALAKE_NOT_EXISTS, "No gravitino
metalake selected");
}
- if (config.simplifyCatalogNames() &&
!catalogConnectorManager.getUsedMetalakes().isEmpty()) {
- throw new TrinoException(
- GravitinoErrorCode.GRAVITINO_MISSING_CONFIG,
- "Multiple metalakes are not supported when setting
gravitino.simplify-catalog-names = true");
- }
- catalogConnectorManager.addMetalake(metalake);
GravitinoStoredProcedureFactory gravitinoStoredProcedureFactory =
new GravitinoStoredProcedureFactory(catalogConnectorManager,
metalake);
return new GravitinoSystemConnector(gravitinoStoredProcedureFactory);
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java
index 46a132821f..4f99a16fc1 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogConnectorManager.java
@@ -58,10 +58,11 @@ import org.slf4j.LoggerFactory;
public class CatalogConnectorManager {
private static final Logger LOG =
LoggerFactory.getLogger(CatalogConnectorManager.class);
- private static final int CATALOG_LOAD_FREQUENCY_SECOND = 10;
private static final int NUMBER_EXECUTOR_THREAD = 1;
private static final int LOAD_METALAKE_TIMEOUT = 60;
+ private int metadataUpdateIntervalSecond = 10;
+
private final ScheduledExecutorService executorService;
private final CatalogRegister catalogRegister;
private final CatalogConnectorFactory catalogConnectorFactory;
@@ -69,7 +70,7 @@ public class CatalogConnectorManager {
private final ConcurrentHashMap<String, CatalogConnectorContext>
catalogConnectors =
new ConcurrentHashMap<>();
- private final Set<String> usedMetalakes = new HashSet<>();
+ private String targetMetalake;
private final Map<String, GravitinoMetalake> metalakes = new
ConcurrentHashMap<>();
private GravitinoAdminClient gravitinoClient;
@@ -101,6 +102,8 @@ public class CatalogConnectorManager {
} else {
this.gravitinoClient = client;
}
+ this.metadataUpdateIntervalSecond =
Integer.parseInt(config.getMetadataRefreshIntervalSecond());
+ this.targetMetalake = config.getMetalake();
}
public void start(ConnectorContext context) throws Exception {
@@ -108,8 +111,8 @@ public class CatalogConnectorManager {
if (catalogRegister.isCoordinator()) {
executorService.scheduleWithFixedDelay(
this::loadMetalake,
- CATALOG_LOAD_FREQUENCY_SECOND,
- CATALOG_LOAD_FREQUENCY_SECOND,
+ metadataUpdateIntervalSecond,
+ metadataUpdateIntervalSecond,
TimeUnit.SECONDS);
}
@@ -123,10 +126,21 @@ public class CatalogConnectorManager {
return;
}
+ Set<String> usedMetalakes = new HashSet<>();
+ if (config.singleMetalakeMode()) {
+ usedMetalakes.add(targetMetalake);
+ metalakes.computeIfAbsent(targetMetalake, this::retrieveMetalake);
+ } else {
+ GravitinoMetalake[] allMetalakes = gravitinoClient.listMetalakes();
+ for (GravitinoMetalake metalake : allMetalakes) {
+ usedMetalakes.add(metalake.name());
+ metalakes.put(metalake.name(), metalake);
+ }
+ }
+
for (String usedMetalake : usedMetalakes) {
try {
- GravitinoMetalake metalake =
- metalakes.computeIfAbsent(usedMetalake, this::retrieveMetalake);
+ GravitinoMetalake metalake = metalakes.get(usedMetalake);
LOG.debug("Load metalake: {}", usedMetalake);
loadCatalogs(metalake);
} catch (Exception e) {
@@ -139,7 +153,7 @@ public class CatalogConnectorManager {
}
}
- private GravitinoMetalake retrieveMetalake(String metalakeName) {
+ public GravitinoMetalake retrieveMetalake(String metalakeName) {
try {
return gravitinoClient.loadMetalake(metalakeName);
} catch (NoSuchMetalakeException e) {
@@ -166,8 +180,7 @@ public class CatalogConnectorManager {
// Delete those catalogs that have been deleted in Gravitino server
Set<String> catalogNameStrings =
Arrays.stream(catalogNames)
- .map(
- id -> config.simplifyCatalogNames() ? id :
getTrinoCatalogName(metalake.name(), id))
+ .map(id -> config.singleMetalakeMode() ? id :
getTrinoCatalogName(metalake.name(), id))
.collect(Collectors.toSet());
for (Map.Entry<String, CatalogConnectorContext> entry :
catalogConnectors.entrySet()) {
@@ -264,23 +277,15 @@ public class CatalogConnectorManager {
}
public String getTrinoCatalogName(String metalake, String catalog) {
- return config.simplifyCatalogNames() ? catalog :
String.format("\"%s.%s\"", metalake, catalog);
+ return config.singleMetalakeMode() ? catalog : String.format("\"%s.%s\"",
metalake, catalog);
}
public String getTrinoCatalogName(GravitinoCatalog catalog) {
return getTrinoCatalogName(catalog.getMetalake(), catalog.getName());
}
- public void addMetalake(String metalake) {
- if (config.simplifyCatalogNames() && usedMetalakes.size() > 1)
- throw new TrinoException(
- GravitinoErrorCode.GRAVITINO_MISSING_CONFIG,
- "Multiple metalakes are not supported when setting
gravitino.simplify-catalog-names = true");
- usedMetalakes.add(metalake);
- }
-
public Set<String> getUsedMetalakes() {
- return usedMetalakes;
+ return metalakes.keySet();
}
public Connector createConnector(
@@ -314,11 +319,6 @@ public class CatalogConnectorManager {
}
public GravitinoMetalake getMetalake(String metalake) {
- if (!usedMetalakes.contains(metalake)) {
- throw new TrinoException(
- GravitinoErrorCode.GRAVITINO_OPERATION_FAILED,
- "This connector does not allowed to access metalake " + metalake);
- }
return metalakes.computeIfAbsent(metalake, this::retrieveMetalake);
}
}
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogRegister.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogRegister.java
index a305ded307..8ccf8e8df9 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogRegister.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/CatalogRegister.java
@@ -77,14 +77,14 @@ public class CatalogRegister {
private void checkSupportCatalogNameWithMetalake(
ConnectorContext context, GravitinoConfig config) {
- if (!config.simplifyCatalogNames()) {
+ if (!config.singleMetalakeMode()) {
int version = Integer.parseInt(context.getSpiVersion());
if (version < MIN_SUPPORT_CATALOG_NAME_WITH_METALAKE_TRINO_SPI_VERSION) {
- String errmsg =
- String.format(
- "Trino-%s does not support catalog name with dots, The minimal
required version is Trino-%d",
- trinoVersion,
MIN_SUPPORT_CATALOG_NAME_WITH_METALAKE_TRINO_SPI_VERSION);
- throw new
TrinoException(GravitinoErrorCode.GRAVITINO_UNSUPPORTED_TRINO_VERSION, errmsg);
+ LOG.warn(
+ "Trino-{} does not support catalog name with dots, The minimal
required version is Trino-{}."
+ + "Some errors may occur when using the USE <CATALOG>.<SCHEMA>
statement in Trino",
+ trinoVersion,
+ MIN_SUPPORT_CATALOG_NAME_WITH_METALAKE_TRINO_SPI_VERSION);
}
}
}