Copilot commented on code in PR #10088:
URL: https://github.com/apache/gravitino/pull/10088#discussion_r2867393855
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseDatabaseOperations.java:
##########
@@ -130,6 +138,55 @@ protected void dropDatabase(String databaseName, boolean
cascade) {
}
}
+ @Override
+ public JdbcSchema load(String databaseName) throws NoSuchSchemaException {
+ if (!exist(databaseName)) {
+ throw new NoSuchSchemaException("Database %s could not be found",
databaseName);
+ }
+
+ Map<String, String> schemaProperties = new HashMap<>();
+ schemaProperties.put(ClusterConstants.ON_CLUSTER, String.valueOf(false));
+ String schemaComment = "";
+
+ try (Connection connection = getConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet =
+ statement.executeQuery(
+ String.format("SHOW CREATE DATABASE `%s`",
databaseName.replace("`", "``")))) {
+ if (resultSet.next()) {
+ String createSql = resultSet.getString(1);
+ Matcher matcher =
ON_CLUSTER_PATTERN.matcher(StringUtils.trimToEmpty(createSql));
+ if (matcher.find()) {
+ schemaProperties.put(ClusterConstants.ON_CLUSTER,
String.valueOf(true));
+ schemaProperties.put(ClusterConstants.CLUSTER_NAME,
matcher.group(1));
+ } else {
+ String inferredClusterName =
detectClusterNameFromQueryLog(connection, databaseName);
+ if (StringUtils.isBlank(inferredClusterName)) {
+ inferredClusterName = detectClusterName(connection, databaseName);
+ }
+ if (StringUtils.isNotBlank(inferredClusterName)) {
+ schemaProperties.put(ClusterConstants.ON_CLUSTER,
String.valueOf(true));
+ schemaProperties.put(ClusterConstants.CLUSTER_NAME,
inferredClusterName);
+ }
+ }
+
+ Matcher commentMatcher =
COMMENT_PATTERN.matcher(StringUtils.trimToEmpty(createSql));
+ if (commentMatcher.find()) {
+ schemaComment = commentMatcher.group(1).replace("''", "'");
+ }
+ }
+ } catch (SQLException se) {
+ throw this.exceptionMapper.toGravitinoException(se);
Review Comment:
Cluster inference in `load()` calls `detectClusterNameFromQueryLog()` /
`detectClusterName()` which can throw `SQLException` (e.g., missing privileges
for `system.query_log` / `system.clusters`, or `query_log` disabled). Today any
such failure aborts `load()` entirely and prevents loading an
otherwise-existing schema. Consider making cluster inference best-effort (catch
`SQLException` inside the detect methods or around the calls, log at debug, and
fall back to `on-cluster=false` / no `cluster-name`).
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseDatabaseOperations.java:
##########
@@ -142,4 +199,80 @@ private boolean onCluster(Map<String, String>
dbProperties) {
return
Boolean.parseBoolean(dbProperties.getOrDefault(ClusterConstants.ON_CLUSTER,
"false"));
}
+
+ private String detectClusterName(Connection connection, String databaseName)
throws SQLException {
+ List<String> clusterNames = new ArrayList<>();
+ String escapedDatabaseName = databaseName.replace("'", "''");
+ try (Statement statement = connection.createStatement();
+ ResultSet clusters =
+ statement.executeQuery("SELECT DISTINCT cluster FROM
system.clusters")) {
+ while (clusters.next()) {
+ clusterNames.add(clusters.getString(1));
+ }
+ }
+
+ for (String clusterName : clusterNames) {
+ String escapedClusterName = clusterName.replace("'", "''");
+ int clusterHostCount;
+ try (Statement statement = connection.createStatement();
+ ResultSet hostCountResultSet =
+ statement.executeQuery(
+ String.format(
+ "SELECT countDistinct(host_name) FROM system.clusters
WHERE cluster = '%s'",
+ escapedClusterName))) {
+ if (!hostCountResultSet.next()) {
+ continue;
+ }
+ clusterHostCount = hostCountResultSet.getInt(1);
+ }
+ if (clusterHostCount <= 1) {
+ continue;
+ }
+
+ try (Statement statement = connection.createStatement();
+ ResultSet dbHostCountResultSet =
+ statement.executeQuery(
+ String.format(
+ "SELECT countDistinct(hostName()) FROM
clusterAllReplicas('%s', system.databases) "
+ + "WHERE name = '%s'",
+ escapedClusterName, escapedDatabaseName))) {
+ if (dbHostCountResultSet.next() && dbHostCountResultSet.getInt(1) ==
clusterHostCount) {
+ return clusterName;
+ }
+ }
+ }
+ return null;
+ }
+
+ private String detectClusterNameFromQueryLog(Connection connection, String
databaseName)
+ throws SQLException {
+ String escapedDatabaseName = databaseName.replace("'", "''");
+ try (Statement flushLogStatement = connection.createStatement()) {
+ flushLogStatement.execute("SYSTEM FLUSH LOGS");
+ } catch (SQLException e) {
+ LOG.debug("Failed to flush ClickHouse logs before loading schema {}",
databaseName, e);
+ }
+
+ String querySql =
+ String.format(
+ "SELECT query FROM system.query_log "
+ + "WHERE type = 'QueryFinish' "
+ + "AND is_initial_query = 1 "
+ + "AND query_kind = 'Create' "
+ + "AND startsWith(lower(query), 'create database') "
+ + "AND positionCaseInsensitive(query, '`%s`') > 0 "
+ + "ORDER BY event_time DESC LIMIT 20",
+ escapedDatabaseName);
Review Comment:
`detectClusterNameFromQueryLog()` builds the `positionCaseInsensitive(query,
'`%s`')` predicate using `databaseName` that is only escaped for single quotes.
Because the value is injected between backticks, a name containing backticks
can break the query (and potentially allow SQL injection). Escape backticks as
well (like the `SHOW CREATE DATABASE` call does) or avoid embedding the
identifier into the query string.
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/integration/test/CatalogClickHouseClusterIT.java:
##########
@@ -356,4 +371,66 @@ public void
testLoadSqlCreatedLocalAndDistributedTableProperties() {
Assertions.assertEquals(
sqlNonClusterLocalTableName,
sqlNonClusterDistributedTable.properties().get(REMOTE_TABLE));
}
+
+ @Test
+ public void testCreateClusterSchemaByApiAndLoadSqlCreatedClusterDatabase() {
+ final String apiClusterComment = "cluster schema from gravitino api";
+ final String sqlClusterComment = "cluster schema from sql";
+ Schema apiClusterSchema =
+ catalog
+ .asSchemas()
+ .createSchema(apiClusterSchemaName, apiClusterComment,
clusterSchemaProperties());
+ Schema loadedApiClusterSchema =
catalog.asSchemas().loadSchema(apiClusterSchemaName);
+ Assertions.assertEquals(apiClusterSchema.name(),
loadedApiClusterSchema.name());
+ Assertions.assertEquals(apiClusterComment,
loadedApiClusterSchema.comment());
+ Assertions.assertEquals("true",
loadedApiClusterSchema.properties().get(ON_CLUSTER));
+ Assertions.assertTrue(
+
StringUtils.isNotBlank(loadedApiClusterSchema.properties().get(CLUSTER_NAME)));
+
+ clickHouseService.executeQuery(
+ String.format(
+ "CREATE DATABASE `%s` ON CLUSTER `%s` COMMENT 'cluster schema from
sql'",
+ sqlClusterSchemaName, ClickHouseContainer.DEFAULT_CLUSTER_NAME));
+
+ Schema loadedSqlClusterSchema =
catalog.asSchemas().loadSchema(sqlClusterSchemaName);
+ Assertions.assertEquals(sqlClusterSchemaName,
loadedSqlClusterSchema.name());
+ Assertions.assertEquals(sqlClusterComment,
loadedSqlClusterSchema.comment());
+ Assertions.assertEquals("true",
loadedSqlClusterSchema.properties().get(ON_CLUSTER));
+ Assertions.assertTrue(
+
StringUtils.isNotBlank(loadedSqlClusterSchema.properties().get(CLUSTER_NAME)));
Review Comment:
This test only asserts `cluster-name` is non-blank for cluster schemas. That
allows false positives (any non-empty value passes) and doesn’t validate that
the loaded schema actually captured the expected cluster
(`ClickHouseContainer.DEFAULT_CLUSTER_NAME`) nor that the intended inference
path is working. Please assert the `cluster-name` equals the expected default
cluster name (at least for the SQL-created database case), so the test
genuinely proves the feature behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]