leonardBang commented on code in PR #23036:
URL: https://github.com/apache/flink/pull/23036#discussion_r1271308957


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##########
@@ -387,9 +389,22 @@ default Table fromValues(AbstractDataType<?> rowType, 
Object... values) {
      *
      * @param catalogName The name under which the catalog will be registered.
      * @param catalog The catalog to register.
+     * @deprecated Use {@link #createCatalog(String, CatalogDescriptor)} 
instead to create a catalog
+     *     using {@link CatalogDescriptor} and store it in the {@link 
CatalogStore}.
      */
+    @Deprecated
     void registerCatalog(String catalogName, Catalog catalog);
 
+    /**
+     * Creates a {@link Catalog} using the provided {@link CatalogDescriptor}. 
All table registered
+     * in the {@link Catalog} can be accessed. The {@link CatalogDescriptor} 
will be persisted into
+     * the {@link CatalogStore}

Review Comment:
   Add a period ?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##########
@@ -387,9 +389,22 @@ default Table fromValues(AbstractDataType<?> rowType, 
Object... values) {
      *
      * @param catalogName The name under which the catalog will be registered.
      * @param catalog The catalog to register.
+     * @deprecated Use {@link #createCatalog(String, CatalogDescriptor)} 
instead to create a catalog
+     *     using {@link CatalogDescriptor} and store it in the {@link 
CatalogStore}.

Review Comment:
   I didn't understand the note, could you have a check? 



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CommonCatalogOptions.java:
##########
@@ -41,4 +41,13 @@ public class CommonCatalogOptions {
      */
     public static final ConfigOption<String> CATALOG_TYPE =
             ConfigOptions.key("type").stringType().noDefaultValue();
+
+    public static final ConfigOption<String> TABLE_CATALOG_STORE_KIND =
+            ConfigOptions.key("table.catalog-store.kind")
+                    .stringType()
+                    .defaultValue("generic_in_memory")

Review Comment:
   public static final String DEFAULT_XXX,  see 
GenericInMemoryCatalogFactoryOptions, you can defined this in 
GenericInMemoryCatalogStoreFactoryOptions



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java:
##########
@@ -196,4 +204,92 @@ public ClassLoader getUserClassLoader() {
                                                 }))
                 .collect(Collectors.toList());
     }
+
+    /**
+     * Finds and creates a {@link CatalogStore} using the provided {@link 
Configuration} and user
+     * classloader.
+     *
+     * <p>The configuration format should be as follows:
+     *
+     * <pre>{@code
+     * table.catalog-store.kind: {identifier}
+     * table.catalog-store.{identifier}.{param1}: xxx
+     * table.catalog-store.{identifier}.{param2}: xxx
+     * }</pre>
+     */
+    public static CatalogStore findAndCreateCatalogStore(
+            Configuration configuration, ClassLoader classLoader) {
+        String identifier = 
configuration.getString(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND);
+        String catalogStoreOptionPrefix =
+                CommonCatalogOptions.TABLE_CATALOG_STORE_OPTION_PREFIX + 
identifier + ".";
+        Map<String, String> options =
+                new DelegatingConfiguration(configuration, 
catalogStoreOptionPrefix).toMap();
+        CatalogStoreFactory catalogStoreFactory =
+                FactoryUtil.discoverFactory(classLoader, 
CatalogStoreFactory.class, identifier);
+        CatalogStoreFactory.Context context =
+                new FactoryUtil.DefaultCatalogStoreContext(options, 
configuration, classLoader);
+        catalogStoreFactory.open(context);
+        return CatalogStoreWithFactory.of(
+                catalogStoreFactory.createCatalogStore(context), 
catalogStoreFactory);
+    }
+
+    /**
+     * A wrapper class for {@link CatalogStore} that includes a {@link 
CatalogStoreFactory}.
+     *
+     * <p>This class can be used by users to close both the {@link 
CatalogStore} and {@link

Review Comment:
   Is this private class designed to use by `users`? 



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##########
@@ -387,9 +389,22 @@ default Table fromValues(AbstractDataType<?> rowType, 
Object... values) {
      *
      * @param catalogName The name under which the catalog will be registered.
      * @param catalog The catalog to register.
+     * @deprecated Use {@link #createCatalog(String, CatalogDescriptor)} 
instead to create a catalog
+     *     using {@link CatalogDescriptor} and store it in the {@link 
CatalogStore}.
      */
+    @Deprecated
     void registerCatalog(String catalogName, Catalog catalog);
 
+    /**
+     * Creates a {@link Catalog} using the provided {@link CatalogDescriptor}. 
All table registered
+     * in the {@link Catalog} can be accessed. The {@link CatalogDescriptor} 
will be persisted into
+     * the {@link CatalogStore}

Review Comment:
   Add a period ?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -180,20 +195,57 @@ public Builder catalogModificationListeners(
             return this;
         }
 
+        public Builder catalogStore(CatalogStore catalogStore) {
+            this.catalogStore = catalogStore;
+            return this;
+        }
+
         public CatalogManager build() {
             checkNotNull(classLoader, "Class loader cannot be null");
             checkNotNull(config, "Config cannot be null");
-            return new CatalogManager(
-                    defaultCatalogName,
-                    defaultCatalog,
-                    dataTypeFactory != null
-                            ? dataTypeFactory
-                            : new DataTypeFactoryImpl(classLoader, config, 
executionConfig),
-                    new ManagedTableListener(classLoader, config),
-                    catalogModificationListeners);
+            checkNotNull(config, "CatalogStore cannot be null");

Review Comment:
   Duplicated code?   
   The check conflicts with `@Nullable CatalogStore catalogStore`, nullable 
member do not need the check, otherwise it should not defined as `@Nullable`
   



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -96,12 +97,21 @@ public final class CatalogManager implements 
CatalogRegistry {
 
     private final List<CatalogModificationListener> 
catalogModificationListeners;
 
+    private final CatalogStore catalogStore;
+
+    private final ClassLoader classLoader;

Review Comment:
   IIUC, the classLoader is only used to build a catalogStore, it should be 
closed when the catalogStore closing



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -180,20 +195,57 @@ public Builder catalogModificationListeners(
             return this;
         }
 
+        public Builder catalogStore(CatalogStore catalogStore) {
+            this.catalogStore = catalogStore;
+            return this;
+        }
+
         public CatalogManager build() {
             checkNotNull(classLoader, "Class loader cannot be null");
             checkNotNull(config, "Config cannot be null");
-            return new CatalogManager(
-                    defaultCatalogName,
-                    defaultCatalog,
-                    dataTypeFactory != null
-                            ? dataTypeFactory
-                            : new DataTypeFactoryImpl(classLoader, config, 
executionConfig),
-                    new ManagedTableListener(classLoader, config),
-                    catalogModificationListeners);
+            checkNotNull(config, "CatalogStore cannot be null");
+            CatalogManager catalogManager =
+                    new CatalogManager(
+                            defaultCatalogName,
+                            defaultCatalog,
+                            dataTypeFactory != null
+                                    ? dataTypeFactory
+                                    : new DataTypeFactoryImpl(classLoader, 
config, executionConfig),
+                            new ManagedTableListener(classLoader, config),
+                            catalogModificationListeners,
+                            catalogStore,
+                            classLoader,
+                            config);
+            catalogManager.open();
+            return catalogManager;
         }
     }
 
+    /**
+     * Initializes the catalog manager resource.
+     *
+     * <p>This method initializes the {@link CatalogStore}.
+     *
+     * @throws CatalogException if an error occurs while initializing the 
CatalogStore.
+     */
+    public void open() throws CatalogException {
+        catalogStore.open();
+    }
+
+    /**
+     * Closes the CatalogManager.

Review Comment:
    Closes the catalog manager and releases its resources.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java:
##########
@@ -322,14 +320,17 @@ private static CatalogManager buildCatalogManager(
             Configuration configuration,
             URLClassLoader userClassLoader,
             SessionEnvironment environment) {
+        CatalogStore catalogStore =

Review Comment:
   could be `final`



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java:
##########
@@ -196,4 +204,92 @@ public ClassLoader getUserClassLoader() {
                                                 }))
                 .collect(Collectors.toList());
     }
+
+    /**
+     * Finds and creates a {@link CatalogStore} using the provided {@link 
Configuration} and user
+     * classloader.
+     *
+     * <p>The configuration format should be as follows:
+     *
+     * <pre>{@code
+     * table.catalog-store.kind: {identifier}
+     * table.catalog-store.{identifier}.{param1}: xxx
+     * table.catalog-store.{identifier}.{param2}: xxx
+     * }</pre>
+     */
+    public static CatalogStore findAndCreateCatalogStore(
+            Configuration configuration, ClassLoader classLoader) {
+        String identifier = 
configuration.getString(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND);
+        String catalogStoreOptionPrefix =
+                CommonCatalogOptions.TABLE_CATALOG_STORE_OPTION_PREFIX + 
identifier + ".";
+        Map<String, String> options =
+                new DelegatingConfiguration(configuration, 
catalogStoreOptionPrefix).toMap();
+        CatalogStoreFactory catalogStoreFactory =
+                FactoryUtil.discoverFactory(classLoader, 
CatalogStoreFactory.class, identifier);
+        CatalogStoreFactory.Context context =
+                new FactoryUtil.DefaultCatalogStoreContext(options, 
configuration, classLoader);
+        catalogStoreFactory.open(context);

Review Comment:
   Why we open a resource in a static Util method? the lifecycle of the  
CatalogStoreFactory should be controlled by the real end user instead of a Util 
Class, right ?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java:
##########
@@ -203,15 +204,12 @@ public void addStatementSetOperation(ModifyOperation 
operation) {
     /** Close resources, e.g. catalogs. */
     public void close() {
         operationManager.close();
-        for (String name : sessionState.catalogManager.listCatalogs()) {
-            try {
-                
sessionState.catalogManager.getCatalog(name).ifPresent(Catalog::close);
-            } catch (Throwable t) {
-                LOG.error(
-                        String.format(
-                                "Failed to close catalog %s for the session 
%s.", name, sessionId),
-                        t);

Review Comment:
   Please add error log for each catalog#close



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java:
##########
@@ -196,4 +204,92 @@ public ClassLoader getUserClassLoader() {
                                                 }))
                 .collect(Collectors.toList());
     }
+
+    /**
+     * Finds and creates a {@link CatalogStore} using the provided {@link 
Configuration} and user
+     * classloader.
+     *
+     * <p>The configuration format should be as follows:
+     *
+     * <pre>{@code
+     * table.catalog-store.kind: {identifier}
+     * table.catalog-store.{identifier}.{param1}: xxx
+     * table.catalog-store.{identifier}.{param2}: xxx
+     * }</pre>
+     */
+    public static CatalogStore findAndCreateCatalogStore(
+            Configuration configuration, ClassLoader classLoader) {
+        String identifier = 
configuration.getString(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND);
+        String catalogStoreOptionPrefix =
+                CommonCatalogOptions.TABLE_CATALOG_STORE_OPTION_PREFIX + 
identifier + ".";
+        Map<String, String> options =
+                new DelegatingConfiguration(configuration, 
catalogStoreOptionPrefix).toMap();
+        CatalogStoreFactory catalogStoreFactory =
+                FactoryUtil.discoverFactory(classLoader, 
CatalogStoreFactory.class, identifier);
+        CatalogStoreFactory.Context context =
+                new FactoryUtil.DefaultCatalogStoreContext(options, 
configuration, classLoader);
+        catalogStoreFactory.open(context);
+        return CatalogStoreWithFactory.of(
+                catalogStoreFactory.createCatalogStore(context), 
catalogStoreFactory);
+    }
+
+    /**
+     * A wrapper class for {@link CatalogStore} that includes a {@link 
CatalogStoreFactory}.
+     *
+     * <p>This class can be used by users to close both the {@link 
CatalogStore} and {@link
+     * CatalogStoreFactory} instances.
+     */
+    private static class CatalogStoreWithFactory implements CatalogStore {

Review Comment:
   oh, A XXFactory implements  XX interface, this class really confused me a 
lot. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to