xuyangzhong commented on code in PR #23990: URL: https://github.com/apache/flink/pull/23990#discussion_r1522841798
########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java: ########## @@ -237,15 +240,14 @@ public static CatalogStoreFactory findAndCreateCatalogStoreFactory( * }</pre> */ public static CatalogStoreFactory.Context buildCatalogStoreFactoryContext( - Configuration configuration, ClassLoader classLoader) { - String identifier = configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND); - String catalogStoreOptionPrefix = - CommonCatalogOptions.TABLE_CATALOG_STORE_OPTION_PREFIX + identifier + "."; - Map<String, String> options = - new DelegatingConfiguration(configuration, catalogStoreOptionPrefix).toMap(); - CatalogStoreFactory.Context context = - new FactoryUtil.DefaultCatalogStoreContext(options, configuration, classLoader); - - return context; + ReadableConfig readableConfig, ClassLoader classLoader) { + String identifier = readableConfig.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND); + ConfigOption<Map<String, String>> catalogStoreMapOptions = Review Comment: My only concern is here, which actually brings weak constraints to the use of options for CatalogStore (the value must be a string). Otherwise, an unknown value will be retrieved without any exception being thrown. After testing `configuration.setBytes(TABLE_CATALOG_STORE_OPTION_PREFIX + "file.haha", new byte[] {1, 2});`, I found that the value became wrong. I see that in jira, an agreement has been reached, so the choice is yours. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org