Hi Ingo,

Really appreciate your feedback.

#1. The reason why we insist on using no "connector" option is that we
want to bring the following design to users:
- With the "connector" option, it is a mapping, unmanaged table.
- Without the "connector" option, it is a managed table. It may be an
Iceberg managed table, or may be a JDBC managed table, or may be a
Flink managed table.

#2. About:
CREATE TABLE T (f0 INT);
ALTER TABLE T SET ('connector' = '…');

I think it is dangerous, even for a generic table. The managed table
should prohibit it.

#3. DDL and Table API

You are right, Table Api should be a superset of SQL. There is no
doubt that it should support BDT.

Best,
Jingsong

On Mon, Oct 25, 2021 at 2:18 PM Ingo Bürk <i...@ververica.com> wrote:
>
> Hi Jingsong,
>
> thanks again for the answers. I think requiring catalogs to implement an
> interface to support BDTs is something we'll need (though personally I
> still prefer explicit DDL here over the "no connector option" approach).
>
> What about more edge cases like
>
> CREATE TABLE T (f0 INT);
> ALTER TABLE T SET ('connector' = '…');
>
> This would have to first create the physical storage and then delete it
> again, right?
>
> On a separate note, he FLIP currently only discusses SQL DDL, and you have
> also mentioned
>
> > BDT only can be dropped by Flink SQL DDL now.
>
> Something Flink suffers from a lot is inconsistencies across APIs. I think
> it is important that we support features on all major APIs, i.e. including
> Table API.
> For example for creating a BDT this would mean e.g. adding something like
> #forManaged(…) to TableDescriptor.
>
>
> Best
> Ingo
>
> On Mon, Oct 25, 2021 at 5:27 AM Jingsong Li <jingsongl...@gmail.com> wrote:
>
> > Hi Ingo,
> >
> > I thought again.
> >
> > I'll try to sort out the current catalog behaviors.
> > Actually, we can divide catalogs into three categories:
> >
> > 1. ExternalCatalog: it can only read or create a single table kind
> > which connects to external storage. TableFactory is provided by
> > Catalog, which can have nothing to do with Flink's Factory discovery
> > mechanism, such as IcebergCatalog, JdbcCatalog, PostgresCatalog, etc.
> > Catalog manages the life cycle of its **managed** tables, which means
> > that creation and drop will affect the real physical storage. The DDL
> > has no "connector" option.
> >
> > 2. GenericCatalog (or FlinkCatalog): only Flink tables are saved and
> > factories are created through Flink's factory discovery mechanism. At
> > this time, the catalog is actually only a storage medium for saving
> > schema and options, such as GenericInMemoryCatalog. Catalog only saves
> > meta information and does not manage the underlying physical storage
> > of tables. These tables are **unmanaged**. The DDL must have a
> > "connector" option.
> >
> > 3. HybridCatalog: It can save both its own **managed** table and
> > generic Flink **unmanaged** table, such as HiveCatalog.
> >
> > We want to use the "connector" option to distinguish whether it is
> > managed or not.
> >
> > Now, consider the Flink managed table in this FLIP.
> > a. ExternalCatalog can not support Flink managed tables.
> > b. GenericCatalog can support Flink managed tables without the
> > "connector" option.
> > c. What about HybridCatalog (HiveCatalog)? Yes, we want HiveCatalog to
> > support Flink managed tables:
> > - with "connector" option in Flink dialect is unmanaged tables
> > - Hive DDL in Hive dialect is Hive managed tables, the parser will add
> > "connector = hive" automatically. At present, there are many
> > differences between Flink DDL and Hive DDL, and even their features
> > have many differences.
> > - without "connector" option in Flink dialect is Flink managed tables.
> >
> > In this way, we can support Flink managed tables while maintaining
> > compatibility.
> >
> > Anyway, we need introduce a "SupportsFlinkManagedTable" to catalog.
> >
> > ############## Back to your question #################
> >
> > > but we should make it clear that this is a limitation and probably
> > document how users can clean up the underlying physical storage manually in
> > this case
> >
> > Yes, it's strange that the catalog should manage tables, but some
> > catalogs don't have this ability.
> > - For PersistentCatalog, the meta will continue until the underlying
> > physical storage is deleted.
> > - For InMemoryCatalog, yes, we should document it for the underlying
> > physical storage of Flink managed tables.
> >
> > > the HiveCatalog doesn't list a 'connector' option for its tables.
> >
> > Actually, It can be divided into two steps: create and save:
> > - When creating a table, the table seen by HiveCatalog must have
> > "connector = hive", which is the hive table (Hive managed table). You
> > can see the "HiveCatalog.isHiveTable".
> > - When saving the table, it will remove the connector of the hive
> > table. We can do this: with "connector" option is Flink generic table,
> > without "connector" option is Hive table, with "flink-managed = true"
> > is Flink managed table.
> >
> > Best,
> > Jingsong Lee
> >
> > On Thu, Oct 21, 2021 at 8:23 PM Ingo Bürk <i...@ververica.com> wrote:
> > >
> > > Hi JingSong,
> > >
> > > thank you for the answers!
> > >
> > > > BDT only can be dropped by Flink SQL DDL now.
> > >
> > > Maybe I'm misunderstanding, but that's only true from the Flink side.
> > What
> > > I meant is that a table could disappear from a catalog entirely outside
> > of
> > > Flink. As a simple example, consider a catalog which represents an IMAP
> > > mail server and each folder as a table. If a folder is deleted from the
> > > mail account, the table would disappear, but Flink would have no way of
> > > knowing that. I don't see a way around this problem, to be honest, but we
> > > should make it clear that this is a limitation and probably document how
> > > users can clean up the underlying physical storage manually in this case?
> > >
> > > > - Option 1: Create table without the connector option, the table will
> > > > be forcibly translated to BDT.
> > >
> > > This would be a breaking change, right? If I remember correctly (but I
> > > might not :-)), even the HiveCatalog doesn't list a 'connector' option
> > for
> > > its tables.
> > >
> > > This approach is also very implicit, and creating physical storage isn't
> > > exactly "free", so I personally would favor one of the other approaches.
> > > Option (2) would be explicit for the end user, while Option (3) is again
> > > implicit for the user and only explicit for the catalog implementor, so I
> > > kind of favor Option (2) because I feel that users should be aware of
> > > creating a Flink-managed table.
> > >
> > > We also need to consider the upgrade path here: if a catalog exposes
> > tables
> > > without 'connector' options today, we need to make sure that once this
> > FLIP
> > > is implemented no errors are thrown because codepaths assume that
> > physical
> > > storage must exist for such tables (since they were created before the
> > > FLIP).
> > >
> > >
> > > Best
> > > Ingo
> > >
> > > On Thu, Oct 21, 2021 at 1:31 PM Jingsong Li <jingsongl...@gmail.com>
> > wrote:
> > >
> > > > Hi Ingo and wenlong,
> > > >
> > > > Thanks for your feedback. Very good questions!
> > > >
> > > > (Built-in Dynamic Table is simplified as BDT)
> > > >
> > > > First, let's look at the following questions:
> > > >
> > > > 1. Does BDT want a separate catalog or can it be placed in all
> > > > catalogs (such as InMemoryCatalog and HiveCatalog)?
> > > >  - BDT wants the latter. Because in iceberg, we have seen that a
> > > > separate catalog undoubtedly needs to recreate a set of catalogs. We
> > > > often don't know whether it is Flink's HiveCatalog or iceberg's
> > > > HiveCatalog. This brings not only duplication of work, but also
> > > > confusion.
> > > >  - How does catalog persist BDT? As a general Flink table, persist the
> > > > schema and options of the table.
> > > >
> > > > 2. Is Flink's DDL mapping or real physical storage?
> > > > - Mapping: creating and dropping tables only change the mapping
> > > > relationship,
> > > > - Physical storage: creating and dropping tables will actually delete
> > > > the underlying storage
> > > > - Status quo: the general connectors are all mapping, while the self
> > > > managed tables of Catalog are real storage.
> > > > - BDT wants real physical storage, because it can provide database
> > > > level experience, and BDT wants to be orthogonal to catalog.
> > > > Therefore, BDT is bound to break the current situation and become a
> > > > new concept.
> > > >
> > > > Based on the above conclusion, let's look at your question.
> > > >
> > > > To Ingo:
> > > >
> > > > > if tables are dropped externally rather than through Flink SQL DDL,
> > how
> > > > would Flink be able to remove the physical storage for it.
> > > >
> > > > BDT only can be dropped by Flink SQL DDL now.
> > > >
> > > > To wenlong:
> > > >
> > > > > How the built-in table would be persisted in Catalog?
> > > >
> > > > Just like a general Flink table, persist the schema and options of the
> > > > table.
> > > >
> > > > > Is it possible to read historical data from the file store first and
> > > > then fetch new changes from the log store? something like a hybrid
> > source,
> > > > but I think we need a mechanism to get exactly-once semantic.
> > > >
> > > > This can be implemented, but we need to save the Kafka offset of the
> > > > current checkpoint in the snapshot, so that we can accurately switch
> > > > between file and log. But this is not in MVP.
> > > >
> > > > To Ingo and wenlong:
> > > >
> > > > > Currently a catalog can provide a default table factory and would be
> > > > used as the top priority factory, what would happen after the default
> > > > factory was introduced.
> > > >
> > > > - Option 1: Create table without the connector option, the table will
> > > > be forcibly translated to BDT.
> > > > - Option 2: Introduce new grammar, for example, "CREATE MANAGED
> > > > TABLE...", this will separate from the default table of catalog.
> > > > Catalog can define its own managed tables.
> > > > - Option 3: Create table without the connector option, but introduce
> > > > interface to Catalog, for example, "SupportsFlinkManagedTable". The
> > > > catalog that can support BDT can implement
> > > > it.(InMemoryCatalog,HiveCatalog). Catalogs that do not support BDT can
> > > > implement their own managed tables.(IcebergCatalog, these catalogs do
> > > > not even support other flink tables)
> > > >
> > > > Best,
> > > > Jingsong
> > > >
> > > > On Thu, Oct 21, 2021 at 11:37 AM wenlong.lwl <wenlong88....@gmail.com>
> > > > wrote:
> > > > >
> > > > > Hi Jingsong, thanks for the proposal, providing a built-in storage
> > > > solution
> > > > > for users will make flink SQL much more easier to use in production.
> > > > >
> > > > > I have some questions which may be missed in the FLIP, but may be
> > > > important
> > > > > IMO:
> > > > > 1. Is it possible to read historical data from the file store first
> > and
> > > > > then fetch new changes from the log store? something like a hybrid
> > > > source,
> > > > > but I think we need a mechanism to get exactly-once semantic.
> > > > > 2. How the built-in table would be persisted in Catalog?
> > > > > 3. Currently a catalog can provide a default table factory and would
> > be
> > > > > used as the top priority factory, what would happen after the default
> > > > > factory was introduced.
> > > > >
> > > > > On Wed, 20 Oct 2021 at 19:35, Ingo Bürk <i...@ververica.com> wrote:
> > > > >
> > > > > > Hi Jingsong,
> > > > > >
> > > > > > thank you for writing up the proposal. The benefits such a
> > mechanism
> > > > will
> > > > > > bring will be very valuable! I haven't yet looked into this in
> > detail,
> > > > but
> > > > > > one question came to my mind immediately:
> > > > > >
> > > > > > The DDL for these tables seems to rely on there not being a
> > 'connector'
> > > > > > option. However, catalogs can provide a custom factory, and thus
> > tables
> > > > > > don't necessarily need to contain such an option already today. How
> > > > will
> > > > > > this interact / work with catalogs? I think there are more points
> > > > regarding
> > > > > > interaction with catalogs, e.g. if tables are dropped externally
> > rather
> > > > > > than through Flink SQL DDL, how would Flink be able to remove the
> > > > physical
> > > > > > storage for it.
> > > > > >
> > > > > >
> > > > > > Best
> > > > > > Ingo
> > > > > >
> > > > > > On Wed, Oct 20, 2021 at 11:14 AM Jingsong Li <
> > jingsongl...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > Kurt and I propose to introduce built-in storage support for
> > dynamic
> > > > > > > table, a truly unified changelog & table representation, from
> > Flink
> > > > > > > SQL’s perspective. We believe this kind of storage will improve
> > the
> > > > > > > usability a lot.
> > > > > > >
> > > > > > > We want to highlight some characteristics about this storage:
> > > > > > >
> > > > > > > - It’s a built-in storage for Flink SQL
> > > > > > > ** Improve usability issues
> > > > > > > ** Flink DDL is no longer just a mapping, but a real creation for
> > > > these
> > > > > > > tables
> > > > > > > ** Masks & abstracts the underlying technical details, no
> > annoying
> > > > > > options
> > > > > > >
> > > > > > > - Supports subsecond streaming write & consumption
> > > > > > > ** It could be backed by a service-oriented message queue (Like
> > > > Kafka)
> > > > > > > ** High throughput scan capability
> > > > > > > ** Filesystem with columnar formats would be an ideal choice just
> > > > like
> > > > > > > iceberg/hudi does.
> > > > > > >
> > > > > > > - More importantly, in order to solve the cognitive bar, storage
> > > > needs
> > > > > > > to automatically address various Insert/Update/Delete inputs and
> > > > table
> > > > > > > definitions
> > > > > > > ** Receive any type of changelog
> > > > > > > ** Table can have primary key or no primary key
> > > > > > >
> > > > > > > Looking forward to your feedback.
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> > > > > > >
> > > > > > > Best,
> > > > > > > Jingsong Lee
> > > > > > >
> > > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Best, Jingsong Lee
> > > >
> >
> >
> >
> > --
> > Best, Jingsong Lee
> >



-- 
Best, Jingsong Lee

Reply via email to