Copilot commented on code in PR #10560:
URL: https://github.com/apache/gravitino/pull/10560#discussion_r3008269015


##########
flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoSessionCatalogStore.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.flink.connector.store;
+
+import static 
org.apache.gravitino.flink.connector.utils.FactoryUtils.isBuiltInCatalog;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.GenericInMemoryCatalogStore;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.junit.Before;
+import org.junit.Test;
+

Review Comment:
   This new test uses JUnit 4 (`org.junit.*` / `@Before` / `@Test`). The build 
is configured to run tests on the JUnit Platform (`useJUnitPlatform()`); 
without the JUnit Vintage engine these tests will not execute, which undermines 
the coverage claimed in the PR description. Please migrate this test to JUnit 
Jupiter (`org.junit.jupiter.api.*`) or add the Vintage engine explicitly to the 
test runtime if JUnit 4 tests are intended to run.



##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/FactoryUtils.java:
##########
@@ -36,6 +41,19 @@ private FactoryUtils() {}
 
   private static final Logger LOG = 
LoggerFactory.getLogger(FactoryUtils.class);
 
+  /** The set of catalog type identifiers managed by Gravitino. */
+  public static final ImmutableSet<String> GRAVITINO_CATALOG_TYPES =
+      ImmutableSet.of(
+          GravitinoHiveCatalogFactoryOptions.IDENTIFIER,
+          GravitinoIcebergCatalogFactoryOptions.IDENTIFIER,
+          GravitinoJdbcCatalogFactoryOptions.MYSQL_IDENTIFIER,
+          GravitinoJdbcCatalogFactoryOptions.POSTGRESQL_IDENTIFIER,
+          GravitinoPaimonCatalogFactoryOptions.IDENTIFIER);
+
+  public static boolean isBuiltInCatalog(String type) {
+    return GRAVITINO_CATALOG_TYPES.contains(type);
+  }
+

Review Comment:
   `GRAVITINO_FACTORY_LIST` is a `public static final` Guava collection, which 
exposes an internal implementation detail as part of the public API surface of 
`FactoryUtils`. Prefer keeping this collection `private` (and exposing only 
`isBuiltInCatalog`), or expose it as a `java.util.Set<String>` to avoid leaking 
Guava types to downstream users.



##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoSessionCatalogStore.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.store;
+
+import static 
org.apache.gravitino.flink.connector.utils.FactoryUtils.isBuiltInCatalog;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.flink.table.catalog.AbstractCatalogStore;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.GenericInMemoryCatalogStore;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A catalog store that combines a session-scoped in-memory {@link 
GenericInMemoryCatalogStore} with
+ * a persistent {@link GravitinoCatalogStore}.
+ *
+ * <p>Catalogs for built-in catalog types are stored only in the in-memory 
store, while all other
+ * catalogs are stored in the Gravitino-backed store. When retrieving, 
listing, or removing
+ * catalogs, entries in the in-memory store take precedence over entries in 
the Gravitino-backed
+ * store.
+ *
+ * <p>This store is intended to be used per Flink session, keeping transient 
catalogs in memory
+ * while delegating persistent catalogs to Apache Gravitino.
+ */
+public class GravitinoSessionCatalogStore extends AbstractCatalogStore {
+  private final GenericInMemoryCatalogStore memoryCatalogStore;
+  private final GravitinoCatalogStore gravitinoCatalogStore;
+
+  public GravitinoSessionCatalogStore(
+      GravitinoCatalogStore gravitinoCatalogStore, GenericInMemoryCatalogStore 
memoryCatalogStore) {
+    this.gravitinoCatalogStore =
+        Preconditions.checkNotNull(gravitinoCatalogStore, "CatalogStore cannot 
be null");
+    this.memoryCatalogStore =
+        Preconditions.checkNotNull(memoryCatalogStore, "MemoryCatalogStore 
cannot be null");
+  }
+
+  @Override
+  public void storeCatalog(String catalogName, CatalogDescriptor descriptor)
+      throws CatalogException {
+    String catalogType = 
descriptor.getConfiguration().get(CommonCatalogOptions.CATALOG_TYPE);
+    if (!isBuiltInCatalog(catalogType)) {
+      memoryCatalogStore.storeCatalog(catalogName, descriptor);
+    } else {
+      gravitinoCatalogStore.storeCatalog(catalogName, descriptor);
+    }

Review Comment:
   `storeCatalog` reads the catalog type via 
`descriptor.getConfiguration().get(CommonCatalogOptions.CATALOG_TYPE)` but does 
not validate it is present. If the type is missing, the code will treat it as a 
session catalog and store it only in memory, which is likely unintended. 
Consider validating `catalogType` (e.g., precondition/argument check) and 
throwing a clear `CatalogException` when it is absent.



##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactory.java:
##########
@@ -43,10 +45,18 @@
 /** The Factory for creating {@link GravitinoCatalogStore}. */
 public class GravitinoCatalogStoreFactory implements CatalogStoreFactory {
   private GravitinoCatalogManager catalogManager;
+  private boolean supportSessionCatalog;
+  private GenericInMemoryCatalogStore memoryCatalogStore;
 
   @Override
   public CatalogStore createCatalogStore() {
-    return new GravitinoCatalogStore(catalogManager);
+    GravitinoCatalogStore gravitinoCatalogStore = new 
GravitinoCatalogStore(catalogManager);
+    if (supportSessionCatalog) {
+      memoryCatalogStore = new GenericInMemoryCatalogStore();
+      memoryCatalogStore.open();
+      return new GravitinoSessionCatalogStore(gravitinoCatalogStore, 
memoryCatalogStore);
+    }
+    return gravitinoCatalogStore;

Review Comment:
   `createCatalogStore()` overwrites `memoryCatalogStore` and calls `open()` 
every time session support is enabled. If Flink (or tests) invokes 
`createCatalogStore()` more than once on the same factory instance, the 
previous in-memory store instance will be leaked and never closed. Consider 
initializing the in-memory store once (e.g., lazily create only when 
`memoryCatalogStore == null`), or close/reuse the existing instance.
   ```suggestion
       if (!supportSessionCatalog) {
         return gravitinoCatalogStore;
       }
   
       if (memoryCatalogStore == null) {
         memoryCatalogStore = new GenericInMemoryCatalogStore();
         memoryCatalogStore.open();
       }
   
       return new GravitinoSessionCatalogStore(gravitinoCatalogStore, 
memoryCatalogStore);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to