Hi all, I updated FLIP based on your feedback:
1. Introduce interfaces: GenericCatalog, ManagedTableFactory, TableDescriptor.forManaged 2. Introduce log.scan.startup.mode (default initial) to Hybrid source. 3. Add description to miss dropped table. Best, Jingsong On Mon, Oct 25, 2021 at 3:39 PM Jingsong Li <jingsongl...@gmail.com> wrote: > > Hi Ingo, > > Really appreciate your feedback. > > #1. The reason why we insist on using no "connector" option is that we > want to bring the following design to users: > - With the "connector" option, it is a mapping, unmanaged table. > - Without the "connector" option, it is a managed table. It may be an > Iceberg managed table, or may be a JDBC managed table, or may be a > Flink managed table. > > #2. About: > CREATE TABLE T (f0 INT); > ALTER TABLE T SET ('connector' = '…'); > > I think it is dangerous, even for a generic table. The managed table > should prohibit it. > > #3. DDL and Table API > > You are right, Table Api should be a superset of SQL. There is no > doubt that it should support BDT. > > Best, > Jingsong > > On Mon, Oct 25, 2021 at 2:18 PM Ingo Bürk <i...@ververica.com> wrote: > > > > Hi Jingsong, > > > > thanks again for the answers. I think requiring catalogs to implement an > > interface to support BDTs is something we'll need (though personally I > > still prefer explicit DDL here over the "no connector option" approach). > > > > What about more edge cases like > > > > CREATE TABLE T (f0 INT); > > ALTER TABLE T SET ('connector' = '…'); > > > > This would have to first create the physical storage and then delete it > > again, right? > > > > On a separate note, he FLIP currently only discusses SQL DDL, and you have > > also mentioned > > > > > BDT only can be dropped by Flink SQL DDL now. > > > > Something Flink suffers from a lot is inconsistencies across APIs. I think > > it is important that we support features on all major APIs, i.e. including > > Table API. > > For example for creating a BDT this would mean e.g. adding something like > > #forManaged(…) to TableDescriptor. > > > > > > Best > > Ingo > > > > On Mon, Oct 25, 2021 at 5:27 AM Jingsong Li <jingsongl...@gmail.com> wrote: > > > > > Hi Ingo, > > > > > > I thought again. > > > > > > I'll try to sort out the current catalog behaviors. > > > Actually, we can divide catalogs into three categories: > > > > > > 1. ExternalCatalog: it can only read or create a single table kind > > > which connects to external storage. TableFactory is provided by > > > Catalog, which can have nothing to do with Flink's Factory discovery > > > mechanism, such as IcebergCatalog, JdbcCatalog, PostgresCatalog, etc. > > > Catalog manages the life cycle of its **managed** tables, which means > > > that creation and drop will affect the real physical storage. The DDL > > > has no "connector" option. > > > > > > 2. GenericCatalog (or FlinkCatalog): only Flink tables are saved and > > > factories are created through Flink's factory discovery mechanism. At > > > this time, the catalog is actually only a storage medium for saving > > > schema and options, such as GenericInMemoryCatalog. Catalog only saves > > > meta information and does not manage the underlying physical storage > > > of tables. These tables are **unmanaged**. The DDL must have a > > > "connector" option. > > > > > > 3. HybridCatalog: It can save both its own **managed** table and > > > generic Flink **unmanaged** table, such as HiveCatalog. > > > > > > We want to use the "connector" option to distinguish whether it is > > > managed or not. > > > > > > Now, consider the Flink managed table in this FLIP. > > > a. ExternalCatalog can not support Flink managed tables. > > > b. GenericCatalog can support Flink managed tables without the > > > "connector" option. > > > c. What about HybridCatalog (HiveCatalog)? Yes, we want HiveCatalog to > > > support Flink managed tables: > > > - with "connector" option in Flink dialect is unmanaged tables > > > - Hive DDL in Hive dialect is Hive managed tables, the parser will add > > > "connector = hive" automatically. At present, there are many > > > differences between Flink DDL and Hive DDL, and even their features > > > have many differences. > > > - without "connector" option in Flink dialect is Flink managed tables. > > > > > > In this way, we can support Flink managed tables while maintaining > > > compatibility. > > > > > > Anyway, we need introduce a "SupportsFlinkManagedTable" to catalog. > > > > > > ############## Back to your question ################# > > > > > > > but we should make it clear that this is a limitation and probably > > > document how users can clean up the underlying physical storage manually > > > in > > > this case > > > > > > Yes, it's strange that the catalog should manage tables, but some > > > catalogs don't have this ability. > > > - For PersistentCatalog, the meta will continue until the underlying > > > physical storage is deleted. > > > - For InMemoryCatalog, yes, we should document it for the underlying > > > physical storage of Flink managed tables. > > > > > > > the HiveCatalog doesn't list a 'connector' option for its tables. > > > > > > Actually, It can be divided into two steps: create and save: > > > - When creating a table, the table seen by HiveCatalog must have > > > "connector = hive", which is the hive table (Hive managed table). You > > > can see the "HiveCatalog.isHiveTable". > > > - When saving the table, it will remove the connector of the hive > > > table. We can do this: with "connector" option is Flink generic table, > > > without "connector" option is Hive table, with "flink-managed = true" > > > is Flink managed table. > > > > > > Best, > > > Jingsong Lee > > > > > > On Thu, Oct 21, 2021 at 8:23 PM Ingo Bürk <i...@ververica.com> wrote: > > > > > > > > Hi JingSong, > > > > > > > > thank you for the answers! > > > > > > > > > BDT only can be dropped by Flink SQL DDL now. > > > > > > > > Maybe I'm misunderstanding, but that's only true from the Flink side. > > > What > > > > I meant is that a table could disappear from a catalog entirely outside > > > of > > > > Flink. As a simple example, consider a catalog which represents an IMAP > > > > mail server and each folder as a table. If a folder is deleted from the > > > > mail account, the table would disappear, but Flink would have no way of > > > > knowing that. I don't see a way around this problem, to be honest, but > > > > we > > > > should make it clear that this is a limitation and probably document how > > > > users can clean up the underlying physical storage manually in this > > > > case? > > > > > > > > > - Option 1: Create table without the connector option, the table will > > > > > be forcibly translated to BDT. > > > > > > > > This would be a breaking change, right? If I remember correctly (but I > > > > might not :-)), even the HiveCatalog doesn't list a 'connector' option > > > for > > > > its tables. > > > > > > > > This approach is also very implicit, and creating physical storage isn't > > > > exactly "free", so I personally would favor one of the other approaches. > > > > Option (2) would be explicit for the end user, while Option (3) is again > > > > implicit for the user and only explicit for the catalog implementor, so > > > > I > > > > kind of favor Option (2) because I feel that users should be aware of > > > > creating a Flink-managed table. > > > > > > > > We also need to consider the upgrade path here: if a catalog exposes > > > tables > > > > without 'connector' options today, we need to make sure that once this > > > FLIP > > > > is implemented no errors are thrown because codepaths assume that > > > physical > > > > storage must exist for such tables (since they were created before the > > > > FLIP). > > > > > > > > > > > > Best > > > > Ingo > > > > > > > > On Thu, Oct 21, 2021 at 1:31 PM Jingsong Li <jingsongl...@gmail.com> > > > wrote: > > > > > > > > > Hi Ingo and wenlong, > > > > > > > > > > Thanks for your feedback. Very good questions! > > > > > > > > > > (Built-in Dynamic Table is simplified as BDT) > > > > > > > > > > First, let's look at the following questions: > > > > > > > > > > 1. Does BDT want a separate catalog or can it be placed in all > > > > > catalogs (such as InMemoryCatalog and HiveCatalog)? > > > > > - BDT wants the latter. Because in iceberg, we have seen that a > > > > > separate catalog undoubtedly needs to recreate a set of catalogs. We > > > > > often don't know whether it is Flink's HiveCatalog or iceberg's > > > > > HiveCatalog. This brings not only duplication of work, but also > > > > > confusion. > > > > > - How does catalog persist BDT? As a general Flink table, persist the > > > > > schema and options of the table. > > > > > > > > > > 2. Is Flink's DDL mapping or real physical storage? > > > > > - Mapping: creating and dropping tables only change the mapping > > > > > relationship, > > > > > - Physical storage: creating and dropping tables will actually delete > > > > > the underlying storage > > > > > - Status quo: the general connectors are all mapping, while the self > > > > > managed tables of Catalog are real storage. > > > > > - BDT wants real physical storage, because it can provide database > > > > > level experience, and BDT wants to be orthogonal to catalog. > > > > > Therefore, BDT is bound to break the current situation and become a > > > > > new concept. > > > > > > > > > > Based on the above conclusion, let's look at your question. > > > > > > > > > > To Ingo: > > > > > > > > > > > if tables are dropped externally rather than through Flink SQL DDL, > > > how > > > > > would Flink be able to remove the physical storage for it. > > > > > > > > > > BDT only can be dropped by Flink SQL DDL now. > > > > > > > > > > To wenlong: > > > > > > > > > > > How the built-in table would be persisted in Catalog? > > > > > > > > > > Just like a general Flink table, persist the schema and options of the > > > > > table. > > > > > > > > > > > Is it possible to read historical data from the file store first and > > > > > then fetch new changes from the log store? something like a hybrid > > > source, > > > > > but I think we need a mechanism to get exactly-once semantic. > > > > > > > > > > This can be implemented, but we need to save the Kafka offset of the > > > > > current checkpoint in the snapshot, so that we can accurately switch > > > > > between file and log. But this is not in MVP. > > > > > > > > > > To Ingo and wenlong: > > > > > > > > > > > Currently a catalog can provide a default table factory and would be > > > > > used as the top priority factory, what would happen after the default > > > > > factory was introduced. > > > > > > > > > > - Option 1: Create table without the connector option, the table will > > > > > be forcibly translated to BDT. > > > > > - Option 2: Introduce new grammar, for example, "CREATE MANAGED > > > > > TABLE...", this will separate from the default table of catalog. > > > > > Catalog can define its own managed tables. > > > > > - Option 3: Create table without the connector option, but introduce > > > > > interface to Catalog, for example, "SupportsFlinkManagedTable". The > > > > > catalog that can support BDT can implement > > > > > it.(InMemoryCatalog,HiveCatalog). Catalogs that do not support BDT can > > > > > implement their own managed tables.(IcebergCatalog, these catalogs do > > > > > not even support other flink tables) > > > > > > > > > > Best, > > > > > Jingsong > > > > > > > > > > On Thu, Oct 21, 2021 at 11:37 AM wenlong.lwl <wenlong88....@gmail.com> > > > > > wrote: > > > > > > > > > > > > Hi Jingsong, thanks for the proposal, providing a built-in storage > > > > > solution > > > > > > for users will make flink SQL much more easier to use in production. > > > > > > > > > > > > I have some questions which may be missed in the FLIP, but may be > > > > > important > > > > > > IMO: > > > > > > 1. Is it possible to read historical data from the file store first > > > and > > > > > > then fetch new changes from the log store? something like a hybrid > > > > > source, > > > > > > but I think we need a mechanism to get exactly-once semantic. > > > > > > 2. How the built-in table would be persisted in Catalog? > > > > > > 3. Currently a catalog can provide a default table factory and would > > > be > > > > > > used as the top priority factory, what would happen after the > > > > > > default > > > > > > factory was introduced. > > > > > > > > > > > > On Wed, 20 Oct 2021 at 19:35, Ingo Bürk <i...@ververica.com> wrote: > > > > > > > > > > > > > Hi Jingsong, > > > > > > > > > > > > > > thank you for writing up the proposal. The benefits such a > > > mechanism > > > > > will > > > > > > > bring will be very valuable! I haven't yet looked into this in > > > detail, > > > > > but > > > > > > > one question came to my mind immediately: > > > > > > > > > > > > > > The DDL for these tables seems to rely on there not being a > > > 'connector' > > > > > > > option. However, catalogs can provide a custom factory, and thus > > > tables > > > > > > > don't necessarily need to contain such an option already today. > > > > > > > How > > > > > will > > > > > > > this interact / work with catalogs? I think there are more points > > > > > regarding > > > > > > > interaction with catalogs, e.g. if tables are dropped externally > > > rather > > > > > > > than through Flink SQL DDL, how would Flink be able to remove the > > > > > physical > > > > > > > storage for it. > > > > > > > > > > > > > > > > > > > > > Best > > > > > > > Ingo > > > > > > > > > > > > > > On Wed, Oct 20, 2021 at 11:14 AM Jingsong Li < > > > jingsongl...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > Kurt and I propose to introduce built-in storage support for > > > dynamic > > > > > > > > table, a truly unified changelog & table representation, from > > > Flink > > > > > > > > SQL’s perspective. We believe this kind of storage will improve > > > the > > > > > > > > usability a lot. > > > > > > > > > > > > > > > > We want to highlight some characteristics about this storage: > > > > > > > > > > > > > > > > - It’s a built-in storage for Flink SQL > > > > > > > > ** Improve usability issues > > > > > > > > ** Flink DDL is no longer just a mapping, but a real creation > > > > > > > > for > > > > > these > > > > > > > > tables > > > > > > > > ** Masks & abstracts the underlying technical details, no > > > annoying > > > > > > > options > > > > > > > > > > > > > > > > - Supports subsecond streaming write & consumption > > > > > > > > ** It could be backed by a service-oriented message queue (Like > > > > > Kafka) > > > > > > > > ** High throughput scan capability > > > > > > > > ** Filesystem with columnar formats would be an ideal choice > > > > > > > > just > > > > > like > > > > > > > > iceberg/hudi does. > > > > > > > > > > > > > > > > - More importantly, in order to solve the cognitive bar, storage > > > > > needs > > > > > > > > to automatically address various Insert/Update/Delete inputs and > > > > > table > > > > > > > > definitions > > > > > > > > ** Receive any type of changelog > > > > > > > > ** Table can have primary key or no primary key > > > > > > > > > > > > > > > > Looking forward to your feedback. > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage > > > > > > > > > > > > > > > > Best, > > > > > > > > Jingsong Lee > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > Best, Jingsong Lee > > > > > > > > > > > > > > > > > -- > > > Best, Jingsong Lee > > > > > > > -- > Best, Jingsong Lee -- Best, Jingsong Lee