Copilot commented on code in PR #10088:
URL: https://github.com/apache/gravitino/pull/10088#discussion_r2867393855


##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseDatabaseOperations.java:
##########
@@ -130,6 +138,55 @@ protected void dropDatabase(String databaseName, boolean 
cascade) {
     }
   }
 
+  @Override
+  public JdbcSchema load(String databaseName) throws NoSuchSchemaException {
+    if (!exist(databaseName)) {
+      throw new NoSuchSchemaException("Database %s could not be found", 
databaseName);
+    }
+
+    Map<String, String> schemaProperties = new HashMap<>();
+    schemaProperties.put(ClusterConstants.ON_CLUSTER, String.valueOf(false));
+    String schemaComment = "";
+
+    try (Connection connection = getConnection();
+        Statement statement = connection.createStatement();
+        ResultSet resultSet =
+            statement.executeQuery(
+                String.format("SHOW CREATE DATABASE `%s`", 
databaseName.replace("`", "``")))) {
+      if (resultSet.next()) {
+        String createSql = resultSet.getString(1);
+        Matcher matcher = 
ON_CLUSTER_PATTERN.matcher(StringUtils.trimToEmpty(createSql));
+        if (matcher.find()) {
+          schemaProperties.put(ClusterConstants.ON_CLUSTER, 
String.valueOf(true));
+          schemaProperties.put(ClusterConstants.CLUSTER_NAME, 
matcher.group(1));
+        } else {
+          String inferredClusterName = 
detectClusterNameFromQueryLog(connection, databaseName);
+          if (StringUtils.isBlank(inferredClusterName)) {
+            inferredClusterName = detectClusterName(connection, databaseName);
+          }
+          if (StringUtils.isNotBlank(inferredClusterName)) {
+            schemaProperties.put(ClusterConstants.ON_CLUSTER, 
String.valueOf(true));
+            schemaProperties.put(ClusterConstants.CLUSTER_NAME, 
inferredClusterName);
+          }
+        }
+
+        Matcher commentMatcher = 
COMMENT_PATTERN.matcher(StringUtils.trimToEmpty(createSql));
+        if (commentMatcher.find()) {
+          schemaComment = commentMatcher.group(1).replace("''", "'");
+        }
+      }
+    } catch (SQLException se) {
+      throw this.exceptionMapper.toGravitinoException(se);

Review Comment:
   Cluster inference in `load()` calls `detectClusterNameFromQueryLog()` / 
`detectClusterName()` which can throw `SQLException` (e.g., missing privileges 
for `system.query_log` / `system.clusters`, or `query_log` disabled). Today any 
such failure aborts `load()` entirely and prevents loading an 
otherwise-existing schema. Consider making cluster inference best-effort (catch 
`SQLException` inside the detect methods or around the calls, log at debug, and 
fall back to `on-cluster=false` / no `cluster-name`).



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseDatabaseOperations.java:
##########
@@ -142,4 +199,80 @@ private boolean onCluster(Map<String, String> 
dbProperties) {
 
     return 
Boolean.parseBoolean(dbProperties.getOrDefault(ClusterConstants.ON_CLUSTER, 
"false"));
   }
+
+  private String detectClusterName(Connection connection, String databaseName) 
throws SQLException {
+    List<String> clusterNames = new ArrayList<>();
+    String escapedDatabaseName = databaseName.replace("'", "''");
+    try (Statement statement = connection.createStatement();
+        ResultSet clusters =
+            statement.executeQuery("SELECT DISTINCT cluster FROM 
system.clusters")) {
+      while (clusters.next()) {
+        clusterNames.add(clusters.getString(1));
+      }
+    }
+
+    for (String clusterName : clusterNames) {
+      String escapedClusterName = clusterName.replace("'", "''");
+      int clusterHostCount;
+      try (Statement statement = connection.createStatement();
+          ResultSet hostCountResultSet =
+              statement.executeQuery(
+                  String.format(
+                      "SELECT countDistinct(host_name) FROM system.clusters 
WHERE cluster = '%s'",
+                      escapedClusterName))) {
+        if (!hostCountResultSet.next()) {
+          continue;
+        }
+        clusterHostCount = hostCountResultSet.getInt(1);
+      }
+      if (clusterHostCount <= 1) {
+        continue;
+      }
+
+      try (Statement statement = connection.createStatement();
+          ResultSet dbHostCountResultSet =
+              statement.executeQuery(
+                  String.format(
+                      "SELECT countDistinct(hostName()) FROM 
clusterAllReplicas('%s', system.databases) "
+                          + "WHERE name = '%s'",
+                      escapedClusterName, escapedDatabaseName))) {
+        if (dbHostCountResultSet.next() && dbHostCountResultSet.getInt(1) == 
clusterHostCount) {
+          return clusterName;
+        }
+      }
+    }
+    return null;
+  }
+
+  private String detectClusterNameFromQueryLog(Connection connection, String 
databaseName)
+      throws SQLException {
+    String escapedDatabaseName = databaseName.replace("'", "''");
+    try (Statement flushLogStatement = connection.createStatement()) {
+      flushLogStatement.execute("SYSTEM FLUSH LOGS");
+    } catch (SQLException e) {
+      LOG.debug("Failed to flush ClickHouse logs before loading schema {}", 
databaseName, e);
+    }
+
+    String querySql =
+        String.format(
+            "SELECT query FROM system.query_log "
+                + "WHERE type = 'QueryFinish' "
+                + "AND is_initial_query = 1 "
+                + "AND query_kind = 'Create' "
+                + "AND startsWith(lower(query), 'create database') "
+                + "AND positionCaseInsensitive(query, '`%s`') > 0 "
+                + "ORDER BY event_time DESC LIMIT 20",
+            escapedDatabaseName);

Review Comment:
   `detectClusterNameFromQueryLog()` builds the `positionCaseInsensitive(query, 
'`%s`')` predicate using `databaseName` that is only escaped for single quotes. 
Because the value is injected between backticks, a name containing backticks 
can break the query (and potentially allow SQL injection). Escape backticks as 
well (like the `SHOW CREATE DATABASE` call does) or avoid embedding the 
identifier into the query string.



##########
catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/integration/test/CatalogClickHouseClusterIT.java:
##########
@@ -356,4 +371,66 @@ public void 
testLoadSqlCreatedLocalAndDistributedTableProperties() {
     Assertions.assertEquals(
         sqlNonClusterLocalTableName, 
sqlNonClusterDistributedTable.properties().get(REMOTE_TABLE));
   }
+
+  @Test
+  public void testCreateClusterSchemaByApiAndLoadSqlCreatedClusterDatabase() {
+    final String apiClusterComment = "cluster schema from gravitino api";
+    final String sqlClusterComment = "cluster schema from sql";
+    Schema apiClusterSchema =
+        catalog
+            .asSchemas()
+            .createSchema(apiClusterSchemaName, apiClusterComment, 
clusterSchemaProperties());
+    Schema loadedApiClusterSchema = 
catalog.asSchemas().loadSchema(apiClusterSchemaName);
+    Assertions.assertEquals(apiClusterSchema.name(), 
loadedApiClusterSchema.name());
+    Assertions.assertEquals(apiClusterComment, 
loadedApiClusterSchema.comment());
+    Assertions.assertEquals("true", 
loadedApiClusterSchema.properties().get(ON_CLUSTER));
+    Assertions.assertTrue(
+        
StringUtils.isNotBlank(loadedApiClusterSchema.properties().get(CLUSTER_NAME)));
+
+    clickHouseService.executeQuery(
+        String.format(
+            "CREATE DATABASE `%s` ON CLUSTER `%s` COMMENT 'cluster schema from 
sql'",
+            sqlClusterSchemaName, ClickHouseContainer.DEFAULT_CLUSTER_NAME));
+
+    Schema loadedSqlClusterSchema = 
catalog.asSchemas().loadSchema(sqlClusterSchemaName);
+    Assertions.assertEquals(sqlClusterSchemaName, 
loadedSqlClusterSchema.name());
+    Assertions.assertEquals(sqlClusterComment, 
loadedSqlClusterSchema.comment());
+    Assertions.assertEquals("true", 
loadedSqlClusterSchema.properties().get(ON_CLUSTER));
+    Assertions.assertTrue(
+        
StringUtils.isNotBlank(loadedSqlClusterSchema.properties().get(CLUSTER_NAME)));

Review Comment:
   This test only asserts `cluster-name` is non-blank for cluster schemas. That 
allows false positives (any non-empty value passes) and doesn’t validate that 
the loaded schema actually captured the expected cluster 
(`ClickHouseContainer.DEFAULT_CLUSTER_NAME`) nor that the intended inference 
path is working. Please assert the `cluster-name` equals the expected default 
cluster name (at least for the SQL-created database case), so the test 
genuinely proves the feature behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to