@Shengkai
> About the catalog jar hot updates

Currently we do not have a similar requirement, but if the catalog
management interface is opened, this can indeed realize the hot
loading of the catalog jar


>  do we need to instantiate the Catalog immediately or defer to the usage

I think this can be the same as before .



@Jark
> There only can be a single catalog manager in TableEnvironment.

big +1 for this.  This can avoid conflicts and also meet the catalog
persistence requirements.


Best,
Feng

On Fri, Feb 10, 2023 at 3:09 PM Jark Wu <imj...@gmail.com> wrote:
>
> Hi Feng,
>
> It's still easy to conflict and be inconsistent even if we have only one
> CatalogProvider, because CatalogProvider only provides readable interfaces
> (listCatalogs, getCatalog). For example, you may register a catalog X, but
> can't list it because it's not in the external metadata service.
>
> To avoid catalog conflicts and keep consistent, we can extract the catalog
> management logic as a pluggable interface, including listCatalog,
> getCatalog, registerCatalog, unregisterCatalog, etc. The
> current CatalogManager is a default in-memory implementation, you can
> replace it with user-defined managers, such as
>  - file-based: which manages catalog information on local files, just like
> how Presto/Trino manages catalogs
>  - metaservice-based: which manages catalog information on external
> metadata service.
>
> There only can be a single catalog manager in TableEnvironment. This
> guarantees data consistency and avoids conflicts. This approach can address
> another pain point of Flink SQL: the catalog information is not persisted.
>
> Can this approach satisfy your requirements?
>
> Best,
> Jark
>
>
>
>
>
> On Fri, 10 Feb 2023 at 11:21, Shengkai Fang <fskm...@gmail.com> wrote:
>
> > Hi Feng.
> >
> > I think your idea is very interesting!
> >
> > 1. I just wonder after initializing the Catalog, will the Session reuse the
> > same Catalog instance or build a new one for later usage? If we reuse the
> > same Catalog, I think it's more like lazy initialization. I am a
> > little prone to rebuild a new one because it's easier for us to catalog jar
> > hot updates.
> >
> > 2. Users use the `CREATE CATALOG` statement in the CatalogManager. In this
> > case, do we need to instantiate the Catalog immediately or defer to the
> > usage?
> >
> > Best,
> > Shengkai
> >
> > Feng Jin <jinfeng1...@gmail.com> 于2023年2月9日周四 20:13写道:
> >
> > > Thanks for your reply.
> > >
> > > @Timo
> > >
> > > >  2) avoid  the default in-memory catalog and offer their catalog before
> > > a  TableEnvironment session starts
> > > >  3) whether this can be disabled and SHOW CATALOGS  can be used for
> > > listing first without having a default catalog.
> > >
> > >
> > > Regarding 2 and 3, I think this problem can be solved by introducing
> > > catalog providers, and users can control some default catalog
> > > behavior.
> > >
> > >
> > > > We could also use the org.apache.flink.table.factories.Factory infra
> > > and  allow catalog providers via pure string properties
> > >
> > > I think this is also very useful. In our usage scenarios, it is
> > > usually multi-cluster management, and it is also necessary to pass
> > > different configurations through parameters.
> > >
> > >
> > > @Jark @Huang
> > >
> > > >  About the lazy catalog initialization
> > >
> > > Our needs may be different. If these properties already exist in an
> > > external system, especially when there may be thousands of these
> > > catalog properties, I don’t think it is necessary to register all
> > > these properties in the Flink env at startup, but we need is that we
> > > can register a catalog  when it needs and we can get the properties
> > > from the external meta system .
> > >
> > >
> > > >  It may be hard to avoid conflicts  and duplicates between
> > > CatalogProvider and CatalogManager
> > >
> > > It is indeed easy to conflict. My idea is that if we separate the
> > > catalog management of the current CatalogManager as the default
> > > CatalogProvider behavior, at the same time, only one CatalogProvider
> > > exists in a Flink Env.  This may avoid catalog conflicts.
> > >
> > >
> > > Best,
> > > Feng
> > >
> > > On Tue, Feb 7, 2023 at 1:01 PM Hang Ruan <ruanhang1...@gmail.com> wrote:
> > > >
> > > > Hi Feng,
> > > > I agree with what Jark said. I think what you are looking for is lazy
> > > > initialization.
> > > >
> > > > I don't think we should introduce the new interface CatalogProvider for
> > > > lazy initialization. What we should do is to store the catalog
> > properties
> > > > and initialize the catalog when we need it. Could you please introduce
> > > some
> > > > other scenarios that we need the CatalogProvider besides the lazy
> > > > initialization?
> > > >
> > > > If we really need the CatalogProvider, I think it is better to be a
> > > single
> > > > instance. Multiple instances are difficult to manage and there are name
> > > > conflicts among providers.
> > > >
> > > > Best,
> > > > Hang
> > > >
> > > > Jark Wu <imj...@gmail.com> 于2023年2月7日周二 10:48写道:
> > > >
> > > > > Hi Feng,
> > > > >
> > > > > I think this feature makes a lot of sense. If I understand correctly,
> > > what
> > > > > you are looking for is lazy catalog initialization.
> > > > >
> > > > > However, I have some concerns about introducing CatalogProvider,
> > which
> > > > > delegates catalog management to users. It may be hard to avoid
> > > conflicts
> > > > > and duplicates between CatalogProvider and CatalogManager. Is it
> > > possible
> > > > > to have a built-in CatalogProvider to instantiate catalogs lazily?
> > > > >
> > > > > An idea in my mind is to introduce another catalog registration API
> > > > > without instantiating the catalog, e.g., registerCatalog(String
> > > > > catalogName, Map<String, String> catalogProperties). The catalog
> > > > > information is stored in CatalogManager as pure strings. The catalog
> > is
> > > > > instantiated and initialized when used.
> > > > >
> > > > > This new API is very similar to other pure-string metadata
> > > registration,
> > > > > such as "createTable(String path, TableDescriptor descriptor)" and
> > > > > "createFunction(String path, String className, List<ResourceUri>
> > > > > resourceUris)".
> > > > >
> > > > > Can this approach satisfy your requirement?
> > > > >
> > > > > Best,
> > > > > Jark
> > > > >
> > > > > On Mon, 6 Feb 2023 at 22:53, Timo Walther <twal...@apache.org>
> > wrote:
> > > > >
> > > > > > Hi Feng,
> > > > > >
> > > > > > this is indeed a good proposal.
> > > > > >
> > > > > > 1) It makes sense to improve the catalog listing for platform
> > > providers.
> > > > > >
> > > > > > 2) Other feedback from the past has shown that users would like to
> > > avoid
> > > > > > the default in-memory catalog and offer their catalog before a
> > > > > > TableEnvironment session starts.
> > > > > >
> > > > > > 3) Also we might reconsider whether a default catalog and default
> > > > > > database make sense. Or whether this can be disabled and SHOW
> > > CATALOGS
> > > > > > can be used for listing first without having a default catalog.
> > > > > >
> > > > > > What do you think about option 2 and 3?
> > > > > >
> > > > > > In any case, I would propose we pass a CatalogProvider to
> > > > > > EnvironmentSettings and only allow a single instance. Catalogs
> > should
> > > > > > never shadow other catalogs.
> > > > > >
> > > > > > We could also use the org.apache.flink.table.factories.Factory
> > infra
> > > and
> > > > > > allow catalog providers via pure string properties. Not sure if we
> > > need
> > > > > > this in the first version though.
> > > > > >
> > > > > > Cheers,
> > > > > > Timo
> > > > > >
> > > > > >
> > > > > > On 06.02.23 11:21, Feng Jin wrote:
> > > > > > > Hi everyone,
> > > > > > >
> > > > > > > The original discussion address is
> > > > > > > https://issues.apache.org/jira/browse/FLINK-30126
> > > > > > >
> > > > > > > Currently, Flink has access to many systems, including kafka,
> > hive,
> > > > > > > iceberg, hudi, elasticsearch, mysql...  The corresponding catalog
> > > name
> > > > > > > might be:
> > > > > > > kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2,
> > > > > > > iceberg_cluster2, elasticsearch_cluster1,  mysql_database1_xxx,
> > > > > > > mysql_database2_xxxx
> > > > > > >
> > > > > > > As the platform of the Flink SQL job, we need to maintain the
> > meta
> > > > > > > information of each system of the company, and when the Flink job
> > > > > > > starts, we need to register the catalog with the Flink table
> > > > > > > environment, so that users can use any table through the
> > > > > > > env.executeSql interface.
> > > > > > >
> > > > > > > When we only have a small number of catalogs, we can register
> > like
> > > > > > > this, but when there are thousands of catalogs, I think that
> > there
> > > > > > > needs to be a dynamic loading mechanism that we can register
> > > catalog
> > > > > > > when needed, speed up the initialization of the table
> > environment,
> > > and
> > > > > > > avoid the useless catalog registration process.
> > > > > > >
> > > > > > > Preliminary thoughts:
> > > > > > >
> > > > > > > A new CatalogProvider interface can be added:
> > > > > > > It contains two interfaces:
> > > > > > > * listCatalogs() interface, which can list all the interfaces
> > that
> > > the
> > > > > > > interface can provide
> > > > > > > * getCatalog() interface,  which can get a catalog instance by
> > > catalog
> > > > > > name.
> > > > > > >
> > > > > > > ```java
> > > > > > > public interface CatalogProvider {
> > > > > > >
> > > > > > >      default void initialize(ClassLoader classLoader,
> > > ReadableConfig
> > > > > > config) {}
> > > > > > >
> > > > > > >      Optional<Catalog> getCatalog(String catalogName);
> > > > > > >
> > > > > > >      Set<String> listCatalogs();
> > > > > > > }
> > > > > > > ```
> > > > > > >
> > > > > > >
> > > > > > > The corresponding implementation in CatalogManager is as follows:
> > > > > > >
> > > > > > > ```java
> > > > > > > public CatalogManager {
> > > > > > >      private @Nullable CatalogProvider catalogProvider;
> > > > > > >
> > > > > > >      private Map<String, Catalog> catalogs;
> > > > > > >
> > > > > > >      public void setCatalogProvider(CatalogProvider
> > > catalogProvider) {
> > > > > > >          this.catalogProvider = catalogProvider;
> > > > > > >      }
> > > > > > >
> > > > > > >      public Optional<Catalog> getCatalog(String catalogName) {
> > > > > > >          // If there is no corresponding catalog in catalogs,
> > > > > > >          // get catalog by catalogProvider
> > > > > > >          if (catalogProvider != null) {
> > > > > > >              Optional<Catalog> catalog =
> > > > > > catalogProvider.getCatalog(catalogName);
> > > > > > >          }
> > > > > > >      }
> > > > > > >
> > > > > > > }
> > > > > > > ```
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Possible problems:
> > > > > > >
> > > > > > > 1. Catalog name conflict, how to choose when the registered
> > catalog
> > > > > > > and the catalog provided by catalog-provider conflict?
> > > > > > > I prefer tableEnv-registered ones over catalogs provided by the
> > > > > > > catalog-provider. If the user wishes to reference the catalog
> > > provided
> > > > > > > by the catalog-provider, they can unregister the catalog in
> > > tableEnv
> > > > > > > through the `unregisterCatalog` interface.
> > > > > > >
> > > > > > > 2. Number of CatalogProviders, is it possible to have multiple
> > > > > > > catalogProvider implementations?
> > > > > > > I don't have a good idea of this at the moment. If multiple
> > > > > > > catalogProviders are supported, it brings much more convenience,
> > > But
> > > > > > > there may be catalog name conflicts between different
> > > > > > > catalogProviders.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Looking forward to your reply, any feedback is appreciated!
> > > > > > >
> > > > > > >
> > > > > > > Best.
> > > > > > >
> > > > > > > Feng Jin
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > >
> >

Reply via email to