fsk119 commented on code in PR #20714:
URL: https://github.com/apache/flink/pull/20714#discussion_r963408538


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java:
##########
@@ -269,6 +238,48 @@ public static SessionContext create(
     // Helpers
     // 
------------------------------------------------------------------------------------------------------------------
 
+    private static CatalogManager buildCatalogManager(
+            Configuration configuration,
+            URLClassLoader userClassLoader,
+            SessionEnvironment environment) {
+        CatalogManager.Builder builder =
+                CatalogManager.newBuilder()
+                        // Currently, the classloader is only used by 
DataTypeFactory.
+                        .classLoader(userClassLoader)
+                        .config(configuration);
+
+        // init default catalog
+        String defaultCatalogName;
+        Catalog defaultCatalog;
+        if (environment.getDefaultCatalog().isPresent()) {
+            defaultCatalogName = environment.getDefaultCatalog().get();
+            defaultCatalog = 
environment.getRegisteredCatalogs().get(defaultCatalogName);
+        } else {
+            EnvironmentSettings settings =
+                    
EnvironmentSettings.newInstance().withConfiguration(configuration).build();
+            defaultCatalogName = settings.getBuiltInCatalogName();

Review Comment:
   It's better if we can check whether the defaultCatalogName also exists in 
the `SessionEnvironment`.
   
   ```
            if 
(environment.getRegisteredCatalogs().containsKey(defaultCatalogName)) {
                   throw new SqlGatewayException(
                           String.format(
                                   "The name of the registered catalog is 
conflicts with the built-in default catalog name: %s.",
                                   defaultCatalogName));
               }
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java:
##########
@@ -385,5 +396,13 @@ public SessionState(
             this.resourceManager = resourceManager;
             this.functionCatalog = functionCatalog;
         }
+
+        private void registerModuleAtHead(String moduleName, Module module) {
+            Deque<String> moduleNames = new 
ArrayDeque<>(moduleManager.listModules());
+            moduleNames.addFirst(moduleName);
+
+            moduleManager.loadModule(moduleName, module);
+            moduleManager.useModules(moduleNames.toArray(new String[0]));
+        }

Review Comment:
   Why not add a method named `buildModuleManager`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to