@Shengkai > About the catalog jar hot updates Currently we do not have a similar requirement, but if the catalog management interface is opened, this can indeed realize the hot loading of the catalog jar
> do we need to instantiate the Catalog immediately or defer to the usage I think this can be the same as before . @Jark > There only can be a single catalog manager in TableEnvironment. big +1 for this. This can avoid conflicts and also meet the catalog persistence requirements. Best, Feng On Fri, Feb 10, 2023 at 3:09 PM Jark Wu <imj...@gmail.com> wrote: > > Hi Feng, > > It's still easy to conflict and be inconsistent even if we have only one > CatalogProvider, because CatalogProvider only provides readable interfaces > (listCatalogs, getCatalog). For example, you may register a catalog X, but > can't list it because it's not in the external metadata service. > > To avoid catalog conflicts and keep consistent, we can extract the catalog > management logic as a pluggable interface, including listCatalog, > getCatalog, registerCatalog, unregisterCatalog, etc. The > current CatalogManager is a default in-memory implementation, you can > replace it with user-defined managers, such as > - file-based: which manages catalog information on local files, just like > how Presto/Trino manages catalogs > - metaservice-based: which manages catalog information on external > metadata service. > > There only can be a single catalog manager in TableEnvironment. This > guarantees data consistency and avoids conflicts. This approach can address > another pain point of Flink SQL: the catalog information is not persisted. > > Can this approach satisfy your requirements? > > Best, > Jark > > > > > > On Fri, 10 Feb 2023 at 11:21, Shengkai Fang <fskm...@gmail.com> wrote: > > > Hi Feng. > > > > I think your idea is very interesting! > > > > 1. I just wonder after initializing the Catalog, will the Session reuse the > > same Catalog instance or build a new one for later usage? If we reuse the > > same Catalog, I think it's more like lazy initialization. I am a > > little prone to rebuild a new one because it's easier for us to catalog jar > > hot updates. > > > > 2. Users use the `CREATE CATALOG` statement in the CatalogManager. In this > > case, do we need to instantiate the Catalog immediately or defer to the > > usage? > > > > Best, > > Shengkai > > > > Feng Jin <jinfeng1...@gmail.com> 于2023年2月9日周四 20:13写道: > > > > > Thanks for your reply. > > > > > > @Timo > > > > > > > 2) avoid the default in-memory catalog and offer their catalog before > > > a TableEnvironment session starts > > > > 3) whether this can be disabled and SHOW CATALOGS can be used for > > > listing first without having a default catalog. > > > > > > > > > Regarding 2 and 3, I think this problem can be solved by introducing > > > catalog providers, and users can control some default catalog > > > behavior. > > > > > > > > > > We could also use the org.apache.flink.table.factories.Factory infra > > > and allow catalog providers via pure string properties > > > > > > I think this is also very useful. In our usage scenarios, it is > > > usually multi-cluster management, and it is also necessary to pass > > > different configurations through parameters. > > > > > > > > > @Jark @Huang > > > > > > > About the lazy catalog initialization > > > > > > Our needs may be different. If these properties already exist in an > > > external system, especially when there may be thousands of these > > > catalog properties, I don’t think it is necessary to register all > > > these properties in the Flink env at startup, but we need is that we > > > can register a catalog when it needs and we can get the properties > > > from the external meta system . > > > > > > > > > > It may be hard to avoid conflicts and duplicates between > > > CatalogProvider and CatalogManager > > > > > > It is indeed easy to conflict. My idea is that if we separate the > > > catalog management of the current CatalogManager as the default > > > CatalogProvider behavior, at the same time, only one CatalogProvider > > > exists in a Flink Env. This may avoid catalog conflicts. > > > > > > > > > Best, > > > Feng > > > > > > On Tue, Feb 7, 2023 at 1:01 PM Hang Ruan <ruanhang1...@gmail.com> wrote: > > > > > > > > Hi Feng, > > > > I agree with what Jark said. I think what you are looking for is lazy > > > > initialization. > > > > > > > > I don't think we should introduce the new interface CatalogProvider for > > > > lazy initialization. What we should do is to store the catalog > > properties > > > > and initialize the catalog when we need it. Could you please introduce > > > some > > > > other scenarios that we need the CatalogProvider besides the lazy > > > > initialization? > > > > > > > > If we really need the CatalogProvider, I think it is better to be a > > > single > > > > instance. Multiple instances are difficult to manage and there are name > > > > conflicts among providers. > > > > > > > > Best, > > > > Hang > > > > > > > > Jark Wu <imj...@gmail.com> 于2023年2月7日周二 10:48写道: > > > > > > > > > Hi Feng, > > > > > > > > > > I think this feature makes a lot of sense. If I understand correctly, > > > what > > > > > you are looking for is lazy catalog initialization. > > > > > > > > > > However, I have some concerns about introducing CatalogProvider, > > which > > > > > delegates catalog management to users. It may be hard to avoid > > > conflicts > > > > > and duplicates between CatalogProvider and CatalogManager. Is it > > > possible > > > > > to have a built-in CatalogProvider to instantiate catalogs lazily? > > > > > > > > > > An idea in my mind is to introduce another catalog registration API > > > > > without instantiating the catalog, e.g., registerCatalog(String > > > > > catalogName, Map<String, String> catalogProperties). The catalog > > > > > information is stored in CatalogManager as pure strings. The catalog > > is > > > > > instantiated and initialized when used. > > > > > > > > > > This new API is very similar to other pure-string metadata > > > registration, > > > > > such as "createTable(String path, TableDescriptor descriptor)" and > > > > > "createFunction(String path, String className, List<ResourceUri> > > > > > resourceUris)". > > > > > > > > > > Can this approach satisfy your requirement? > > > > > > > > > > Best, > > > > > Jark > > > > > > > > > > On Mon, 6 Feb 2023 at 22:53, Timo Walther <twal...@apache.org> > > wrote: > > > > > > > > > > > Hi Feng, > > > > > > > > > > > > this is indeed a good proposal. > > > > > > > > > > > > 1) It makes sense to improve the catalog listing for platform > > > providers. > > > > > > > > > > > > 2) Other feedback from the past has shown that users would like to > > > avoid > > > > > > the default in-memory catalog and offer their catalog before a > > > > > > TableEnvironment session starts. > > > > > > > > > > > > 3) Also we might reconsider whether a default catalog and default > > > > > > database make sense. Or whether this can be disabled and SHOW > > > CATALOGS > > > > > > can be used for listing first without having a default catalog. > > > > > > > > > > > > What do you think about option 2 and 3? > > > > > > > > > > > > In any case, I would propose we pass a CatalogProvider to > > > > > > EnvironmentSettings and only allow a single instance. Catalogs > > should > > > > > > never shadow other catalogs. > > > > > > > > > > > > We could also use the org.apache.flink.table.factories.Factory > > infra > > > and > > > > > > allow catalog providers via pure string properties. Not sure if we > > > need > > > > > > this in the first version though. > > > > > > > > > > > > Cheers, > > > > > > Timo > > > > > > > > > > > > > > > > > > On 06.02.23 11:21, Feng Jin wrote: > > > > > > > Hi everyone, > > > > > > > > > > > > > > The original discussion address is > > > > > > > https://issues.apache.org/jira/browse/FLINK-30126 > > > > > > > > > > > > > > Currently, Flink has access to many systems, including kafka, > > hive, > > > > > > > iceberg, hudi, elasticsearch, mysql... The corresponding catalog > > > name > > > > > > > might be: > > > > > > > kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2, > > > > > > > iceberg_cluster2, elasticsearch_cluster1, mysql_database1_xxx, > > > > > > > mysql_database2_xxxx > > > > > > > > > > > > > > As the platform of the Flink SQL job, we need to maintain the > > meta > > > > > > > information of each system of the company, and when the Flink job > > > > > > > starts, we need to register the catalog with the Flink table > > > > > > > environment, so that users can use any table through the > > > > > > > env.executeSql interface. > > > > > > > > > > > > > > When we only have a small number of catalogs, we can register > > like > > > > > > > this, but when there are thousands of catalogs, I think that > > there > > > > > > > needs to be a dynamic loading mechanism that we can register > > > catalog > > > > > > > when needed, speed up the initialization of the table > > environment, > > > and > > > > > > > avoid the useless catalog registration process. > > > > > > > > > > > > > > Preliminary thoughts: > > > > > > > > > > > > > > A new CatalogProvider interface can be added: > > > > > > > It contains two interfaces: > > > > > > > * listCatalogs() interface, which can list all the interfaces > > that > > > the > > > > > > > interface can provide > > > > > > > * getCatalog() interface, which can get a catalog instance by > > > catalog > > > > > > name. > > > > > > > > > > > > > > ```java > > > > > > > public interface CatalogProvider { > > > > > > > > > > > > > > default void initialize(ClassLoader classLoader, > > > ReadableConfig > > > > > > config) {} > > > > > > > > > > > > > > Optional<Catalog> getCatalog(String catalogName); > > > > > > > > > > > > > > Set<String> listCatalogs(); > > > > > > > } > > > > > > > ``` > > > > > > > > > > > > > > > > > > > > > The corresponding implementation in CatalogManager is as follows: > > > > > > > > > > > > > > ```java > > > > > > > public CatalogManager { > > > > > > > private @Nullable CatalogProvider catalogProvider; > > > > > > > > > > > > > > private Map<String, Catalog> catalogs; > > > > > > > > > > > > > > public void setCatalogProvider(CatalogProvider > > > catalogProvider) { > > > > > > > this.catalogProvider = catalogProvider; > > > > > > > } > > > > > > > > > > > > > > public Optional<Catalog> getCatalog(String catalogName) { > > > > > > > // If there is no corresponding catalog in catalogs, > > > > > > > // get catalog by catalogProvider > > > > > > > if (catalogProvider != null) { > > > > > > > Optional<Catalog> catalog = > > > > > > catalogProvider.getCatalog(catalogName); > > > > > > > } > > > > > > > } > > > > > > > > > > > > > > } > > > > > > > ``` > > > > > > > > > > > > > > > > > > > > > > > > > > > > Possible problems: > > > > > > > > > > > > > > 1. Catalog name conflict, how to choose when the registered > > catalog > > > > > > > and the catalog provided by catalog-provider conflict? > > > > > > > I prefer tableEnv-registered ones over catalogs provided by the > > > > > > > catalog-provider. If the user wishes to reference the catalog > > > provided > > > > > > > by the catalog-provider, they can unregister the catalog in > > > tableEnv > > > > > > > through the `unregisterCatalog` interface. > > > > > > > > > > > > > > 2. Number of CatalogProviders, is it possible to have multiple > > > > > > > catalogProvider implementations? > > > > > > > I don't have a good idea of this at the moment. If multiple > > > > > > > catalogProviders are supported, it brings much more convenience, > > > But > > > > > > > there may be catalog name conflicts between different > > > > > > > catalogProviders. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Looking forward to your reply, any feedback is appreciated! > > > > > > > > > > > > > > > > > > > > > Best. > > > > > > > > > > > > > > Feng Jin > > > > > > > > > > > > > > > > > > > > > > > > > > > > >