Hi Fabian, I think converting an append-only table into temporal table contains two things: (1) converting append-only table into changelog table (or retraction table as you said) (2) define the converted changelog table (maybe is a view now) as temporal (or history tracked).
The first thing is also mentioned and discussed in FLIP-105 design draft [1] which proposed a syntax to convert the append-only table into a changelog table. I think TEMPORAL TABLE is quite straightforward and simple, and can satisfy most existing changelog data with popular CDC formats. TEMPORAL VIEW is flexible but will involve more SQL codes. I think we can support them both. Best, Jark [1]: https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#heading=h.sz656g8mb2wb On Fri, 17 Apr 2020 at 23:52, Fabian Hueske <fhue...@gmail.com> wrote: > Hi, > > I agree with most of what Timo said. > > The TEMPORAL keyword (which unfortunately might be easily confused with > TEMPORARY...) looks very intuitive and I think using the only time > attribute for versioning would be a good choice. > > However, TEMPORAL TABLE on retraction tables do not solve the full problem. > I believe there will be also cases where we need to derive a temporal table > from an append only table (what TemporalTableFunctions do right now). > I think the best choice for this would be TEMPORAL VIEW but as I explained, > it might be a longer way until this can be supported. > TEMPORAL VIEW would also address the problem of preprocessing. > > > Regarding retraction table with a primary key and a time-attribute: > > These semantics are still unclear to me. Can retractions only occur > > within watermarks? Or are they also used for representing late updates? > > Time attributes and retraction streams are a challenging topic that I > haven't completely understood yet. > So far we treated time attributes always as part of the data. > In combination with retractions, it seems that they become metadata that > specifies when a change was done. > I think this is different from treating time attributes as regular data. > > Cheers, Fabian > > > Am Fr., 17. Apr. 2020 um 17:23 Uhr schrieb Seth Wiesman < > sjwies...@gmail.com > >: > > > I really like the TEMPORAL keyword, I find it very intuitive. > > > > The down side of this approach would be that an additional preprocessing > > > step would not be possible anymore because there is no preceding view. > > > > > > > Yes and no. My understanding is we are not talking about making any > > changes to how temporal tables are defined in the table api. Since you > > cannot currently define temporal table functions in pure SQL > applications, > > but only pre-register them in YAML, you can't do any pre-processing as it > > stands today. Preprocessing may be a generally useful feature, I'm not > > sure, but this syntax does not lose us anything in pure SQL applications. > > > > These semantics are still unclear to me. Can retractions only occur > > > within watermarks? Or are they also used for representing late updates? > > > > > > > I do not know the SQL standard well enough to give a principled response > to > > this question. However, in my observation of production workloads, users > of > > temporal table functions are doing so to denormalize star schemas before > > performing further transformations and aggregations and expect the output > > to be an append stream. With the ongoing work to better support > changelogs, > > the need for users to understand the differences in append vs upsert in > > their query may be diminishing but everyone else on this thread can > better > > speak to that. > > > > Seth > > > > On Fri, Apr 17, 2020 at 10:03 AM Timo Walther <twal...@apache.org> > wrote: > > > > > Hi Fabian, > > > > > > thank you very much for this great summary! > > > > > > I wasn't aware of the Polymorphic Table Functions standard. This is a > > > very interesting topic that we should definitely consider in the > future. > > > Maybe this could also help us in defining tables more dynamically > within > > > a query. It could help solving problems as discussed in FLIP-113. > > > > > > Regarding joining: > > > > > > IMO we should aim for "FOR SYSTEM_TIME AS OF x" instead of the current > > > `LATERAL TABLE(rates(x))` syntax. A function that also behaves like a > > > table and needs this special `LATERAL` keyword during joining is not > > > very intuitive. The PTF could be used once they are fully supported by > > > Calcite and we have the big picture how to also use them for other > > > time-based operations (windows?, joins?). > > > > > > Regarding how represent a temporal table: > > > > > > I think that our current DDL, current LookupTableSource and temporal > > > tables can fit nicely together. > > > > > > How about we simply introduce an additional keyword `TEMPORAL` to > > > indicate history tracking semantics? I think this is the minimal > > > invasive solution: > > > > > > CREATE TEMPORAL TABLE rates ( > > > currency CHAR(3) NOT NULL PRIMARY KEY, > > > rate DOUBLE, > > > rowtime TIMESTAMP, > > > WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE) > > > WITH (...); > > > > > > - The primary key would be defined by the DDL. > > > - The available time attribute would be defined by the DDL. Either as > > > the only time attribute of the table or we introduce a special > > > constraint similar to `PRIMARY KEY`. > > > > > > The down side of this approach would be that an additional > preprocessing > > > step would not be possible anymore because there is no preceding view. > > > > > > The `TEMPORAL` semantic can be stored in the properties of the table > > > when writing to a catalog. We do the same for watermarks and computed > > > columns. > > > > > > Without a `TEMPORAL` keyword, a `FOR SYSTEM_TIME AS OF x` would only > > > work on processing time by a lookup into the external system or on > > > event-time by using the time semantics that the external system > supports. > > > > > > Regarding retraction table with a primary key and a time-attribute: > > > > > > These semantics are still unclear to me. Can retractions only occur > > > within watermarks? Or are they also used for representing late updates? > > > > > > Regards, > > > Timo > > > > > > > > > On 17.04.20 14:34, Fabian Hueske wrote: > > > > Hi all, > > > > > > > > First of all, I appologize for the text wall that's following... ;-) > > > > > > > > A temporal table join joins an append-only table and a temporal > table. > > > > The question about how to represent a temporal table join boils down > to > > > two > > > > questions: > > > > > > > > 1) How to represent a temporal table > > > > 2) How to specify the join of an append-only table and a temporal > table > > > > > > > > I'll discuss these points separately. > > > > > > > > # 1 How to represent a temporal table > > > > > > > > A temporal table is a table that can be looked up with a time > parameter > > > and > > > > which returns the rows of the table at that point in time / for that > > > > version. > > > > In order to be able to (conceptually) look up previous versions, a > > > temporal > > > > table must be (conceptually) backed by a history table that tracks > all > > > > previous versions (see SqlServer docs [1]). > > > > In the context of our join, we added another restriction namely that > > the > > > > table must have a primary key, i.e., there is only one row for each > > > version > > > > for each unique key. > > > > > > > > Hence, the requirements for a temporal table are: > > > > * The temporal table has a primary key / unique attribute > > > > * The temporal table has a time-attribute that defines the start of > the > > > > validity interval of a row (processing time or event time) > > > > * The system knows that the history of the table is tracked and can > > infer > > > > how to look up a version. > > > > > > > > There are two possible types of input from which we want to create > > > temporal > > > > tables (that I'm aware of): > > > > > > > > * append-only tables, i.e., tables that contain the full change > history > > > > * retraction tables, i.e., tables that are updating and do not > remember > > > the > > > > history. > > > > > > > > There are a few ways to do this: > > > > > > > > ## 1.1 Defining a VIEW on an append-only table with a time attribute. > > > > > > > > The following view definition results in a view that provides the > > latest > > > > rate for each currency. > > > > > > > > CREATE VIEW rates AS > > > > SELECT > > > > currency, MAX(rate) as rate, MAX(rowtime) as rowtime > > > > FROM rates_history rh1 > > > > WHERE > > > > rh1.rowtime = ( > > > > SELECT max(rowtime) > > > > FROM rates_history rh2 > > > > WHERE rh2.curreny = rh1.currency) > > > > GROUP BY currency > > > > WITH ( > > > > 'historytracking' = 'true', > > > > 'historytracking.starttime' = 'rowtime'); > > > > > > > > However, we also need to tell the system to track the history of all > > > > changes of the view in order to be able to look it up. > > > > That's what the properties in the WITH clause are for (inspired by > > > > SqlServer's TEMPORAL TABLE DDL syntax). > > > > Note that this is *not* a syntax proposal but only meant to show > which > > > > information is needed. > > > > This view allows to look up any version of the "rates" view. > > > > > > > > In addition to designing and implementing the DDL syntax for views > that > > > > support temporal lookups, the optimizer would need to understand the > > > > semantics of the view definition in depth. > > > > Among other things it needs to understand that the MAX() aggregation > on > > > the > > > > time-attribute preserves its watermark alignment. > > > > AFAIK, this is not the case at the moment (the time attribute would > be > > > > converted into a regular TIMESTAMP and lose it's time attribute > > > properties) > > > > > > > > ## 1.2 A retraction table with a primary key and a time-attribute. > > > > > > > > On paper it looks like such a table would automatically qualify as a > > > > time-versioned table because it completely fulfills the requirements. > > > > However, I don't think we can use it *as is* as a temporal table if > we > > > want > > > > to have clean semantics. > > > > The problem here is the "lost history" of the retraction table. The > > > dynamic > > > > table that is defined on the retraction stream only stores the latest > > > > version (even though it sees all versions). > > > > Conceptually, a temporal table look up the version of the table at > any > > > > point in time because it is backed by a history table. > > > > If this information is not available, we cannot have a semantically > > clean > > > > definition of the join IMO. > > > > > > > > Therefore we should define the table in a way that the system knows > > that > > > > the history is tracked. > > > > In MSSQL uses a syntax similar to this one > > > > > > > > CREATE TABLE rates ( > > > > currency CHAR(3) NOT NULL PRIMARY KEY, > > > > rate DOUBLE, > > > > rowtime TIMESTAMP, > > > > WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE) > > > > WITH ( > > > > 'historytracking' = 'true', > > > > 'historytracking.starttime' = 'rowtime'); > > > > > > > > The 'historytracking' properties would decare that the table tracks > its > > > > history and also specify the attribute (rowtime) that is used for > > > > versioning. > > > > > > > > ## 1.3 Registering a TableFunction that takes an append-only table > with > > > > time attribute > > > > > > > > The TableFunction requires a few parameters: > > > > * the source table from which to derive the temporal table > > > > * the key attribute on which the versions of the source table should > be > > > > computed > > > > * the time attribute that defines the versions > > > > * a lookup timestamp for the version of that is returned. > > > > > > > > The reason why we chose the TableFunction approach over the VIEW > > approach > > > > so far were: > > > > * It is easier for the optimizer to identify a build-in table > function > > > than > > > > to analyze and reason about a generic VIEW. > > > > * We would need to make the optimizer a lot smarter to infer all the > > > > properties from the generic VIEW definition that we need for a > temporal > > > > table join. > > > > * Passing a parameter to a function is a known thing, passing a > > parameter > > > > to a VIEW not so much. > > > > * Users would need to specify the VIEW exactly correct, such that it > > can > > > be > > > > used as a temporal table. Look at 1.1 why this is not trivial. > > > > > > > > There is two ways to use a TableFunction: > > > > > > > > ### 1.3.1 Built-in and pre-registered function that is parameterized > in > > > the > > > > SQL query > > > > > > > > Here, we do not need to do anything to register the function. We > simply > > > use > > > > it in the query (see example in 2.2 below) > > > > > > > > ### 1.3.2 Parameterize function when it is registered in the catalog > > > (with > > > > a provided Java implementation) > > > > > > > > This is the approach, we've used so far. In the Table API, the > function > > > is > > > > first parameterized and created and then registered: > > > > We would need a DDL syntax to parameterize UDFs on registration. > > > > I don't want to propose a syntax here, but just to get an idea it > might > > > > look like this: > > > > > > > > CREATE FUNCTION rates AS > > > > 'org.apache.flink.table.udfs.TemporalTableFunction' WITH ('table' = > > > > 'rates_history', 'key' = 'cur', 'time' = 'rowtime') > > > > > > > > Right now, the Flink Catalog interface does not have the > functionality > > to > > > > store such parameters and would need some hacks to properly create > > > properly > > > > parameterize function instances. > > > > > > > > > > > > > > > > # 2 Defining a join of an append-only table and a temporal table > > > > > > > > The append-only table needs to have a time-attribute (processing time > > or > > > > event time, but same as the temporal table). > > > > The join then needs to specify two things: > > > > * an equality predicate that includes the primary key of the temporal > > > table > > > > * declare the time attribute of the append-only table as the time as > of > > > > which to look up the temporal table, i.e, get the version of the > > temporal > > > > table that is valid for the timestamp of the current row from the > > > > append-only table > > > > > > > > The tricky part (from a syntax point of view) is to specify the > lookup > > > > time. > > > > > > > > ## 2.1 the temporal table is a regular table or view (see approaches > > 1.1 > > > > and 1.2 above) > > > > > > > > In this case we can use the "FOR SYSTEM_TIME AS OF x" clause as > > follows: > > > > > > > > SELECT * > > > > FROM orders o, rates r FOR SYSTEM_TIME AS OF o.ordertime > > > > WHERE o.currency = r.currency > > > > > > > > IMO, this is a great syntax and the one we should strive for. > > > > We would need to bend the rules of the SQL standard which only > allows x > > > in > > > > "FOR SYSTEM_TIME AS OF x" to be a constant and the table on which it > is > > > > applied usually needs to be a specific type (not sure if views are > > > > supported), but I guess this is fine. > > > > NOTE: the "FOR SYSTEM_TIME AS OF x" is already supported for > > LookupTable > > > > Joins if x is a processing time attribute [2]. > > > > > > > > ## 2.2 the temporal table is a TableFunction and parameterized in the > > > query > > > > (see 1.3.1 above) > > > > > > > > SELECT * > > > > FROM orders o, > > > > TEMPORAL_TABLE( > > > > table => TABLE(rates_history), > > > > key => DESCRIPTOR(currency), > > > > time => DESCRIPTOR(rowtime)) r > > > > ON o.currency = r.currency > > > > > > > > The function "TEMPORAL_TABLE" is built-in and nothing was registered > in > > > the > > > > catalog (except the rates_history table). > > > > In fact this is valid SQL:2016 syntax and called Polymorphic Table > > > > Functions. Have a look here [3]. > > > > > > > > ## 2.3 the temporal table is a TableFunction that was parameterized > > > during > > > > registration (see 1.3.2 above) > > > > > > > > This is what we have at the momement. > > > > > > > > SELECT * > > > > FROM orders o, > > > > LATERAL TABLE (rates(o.ordertime)) > > > > ON o.currency = r.currency > > > > > > > > The TableFunction "rates" was registered in the catalog and > > parameterized > > > > to the "rates_history" append-only table, the key was set to > > "currency", > > > > and the time attribute was declared. > > > > > > > > # SUMMARY > > > > > > > > IMO we should in the long run aim to define temporal tables either as > > > > upsert retraction tables and views on append-only tables and join > them > > > > using the "FOR SYSTEM_TIME AS OF x" syntax. > > > > I guess it is debatable whether we need to decare to track history > for > > > > these tables (which we don't actually do) or if we do it by > convention > > if > > > > the table has a time attribute. > > > > It should be (relatively) easy to get this to work for retraction > > tables > > > > which will be supported soon. > > > > It will be more work for views because we need to improve the time > > > > attribute handling with MAX() aggregations. > > > > The "FOR SYSTEM_TIME AS OF x" is already supported for > > LookupTableSources > > > > and would "only" need to be adapted to work on temporal tables. > > > > > > > > Registering parameterized TableFunctions in the catalog seems like > > quite > > > a > > > > bit of work. We need new DDL syntax, extend the catalog and function > > > > instantiation. This won't be easy, IMO. > > > > If we only support them as TEMPORARY FUNCTION which are not > registered > > in > > > > the catalog it will be easier. The question is whether it is worth > the > > > > effort if we decide for the other approach. > > > > > > > > Using TableFunctions that are parameterized in the query will require > > to > > > > extend the Calcite parser and framework to support Polymorphic Table > > > > Functions. > > > > However, there might already some work be done there, because AFAIK > > > Apache > > > > Beam aims to support this syntax for windowing functions as described > > in > > > > the "One SQL to rule them all" paper [4]. > > > > It might be the fastest and fully SQL standard compliant way. > > > > > > > > Cheers, > > > > Fabian > > > > > > > > [1] > > > > > > > > > > https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables > > > > [2] > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#usage-1 > > > > [3] > > > > > > > > > > https://standards.iso.org/ittf/PubliclyAvailableStandards/c069776_ISO_IEC_TR_19075-7_2017.zip > > > > [4] https://arxiv.org/abs/1905.12133 > > > > > > > > Am Fr., 17. Apr. 2020 um 06:37 Uhr schrieb Jark Wu <imj...@gmail.com > >: > > > > > > > >> Hi Konstantin, > > > >> > > > >> Thanks for bringing this discussion. I think temporal join is a very > > > >> important feature and should be exposed to pure SQL users. > > > >> And I already received many requirements like this. > > > >> However, my concern is that how to properly support this feature in > > SQL. > > > >> Introducing a DDL syntax for Temporal Table Function is one way, but > > > maybe > > > >> not the best one. > > > >> > > > >> The most important reason is that the underlying of temporal table > > > function > > > >> is exactly a changelog stream. > > > >> The temporal join is actually temporal joining a fact stream with > the > > > >> changelog stream on processing time or event time. > > > >> We will soon support to create a changelog source using DDL once > > FLIP-95 > > > >> and FLIP-105 is finished. > > > >> At that time, we can have a simple DDL to create changelog source > like > > > >> this; > > > >> > > > >> CREATE TABLE rate_changelog ( > > > >> currency STRING, > > > >> rate DECIMAL > > > >> ) WITH ( > > > >> 'connector' = 'kafka', > > > >> 'topic' = 'rate_binlog', > > > >> 'properties.bootstrap.servers' = 'localhost:9092', > > > >> 'format' = 'debezium-json' > > > >> ); > > > >> > > > >> In the meanwhile, we already have a SQL standard temporal join > syntax > > > [1], > > > >> i.e. the "A JOIN B FOR SYSTEM_TIME AS OF ..". > > > >> It is currently used as dimension table lookup join, but the > semantic > > is > > > >> the same to the "temporal table function join"[2]. > > > >> I'm in favor of "FOR SYSTEM_TIME AS OF" because it is more nature > > > >> becuase the definition of B is a *table* not a *table function*, > > > >> and the syntax is included in SQL standard. > > > >> > > > >> So once we have the ability to define "rate_changelog" table, then > we > > > can > > > >> use the following query to temporal join the changelog on processing > > > time. > > > >> > > > >> SELECT * > > > >> FROM orders JOIN rate_changelog FOR SYSTEM_TIME AS OF > orders.proctime > > > >> ON orders.currency = rate_changelog.currency; > > > >> > > > >> In a nutshell, once FLIP-95 and FLIP-105 is ready, we can easily to > > > support > > > >> "temporal join on changelogs" without introducing new syntax. > > > >> IMO, introducing a DDL syntax for Temporal Table Function looks like > > > not an > > > >> easy way and may have repetitive work. > > > >> > > > >> Best, > > > >> Jark > > > >> > > > >> [1]: > > > >> > > > >> > > > > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table > > > >> [2]: > > > >> > > > >> > > > > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> On Thu, 16 Apr 2020 at 23:04, Benchao Li <libenc...@gmail.com> > wrote: > > > >> > > > >>> Hi Konstantin, > > > >>> > > > >>> Thanks for bringing up this discussion. +1 for the idea. > > > >>> We have met this in our company too, and I planned to support it > > > recently > > > >>> in our internal branch. > > > >>> > > > >>> regarding to your questions, > > > >>> 1) I think it might be more a table/view than function, just like > > > >> Temporal > > > >>> Table (which is also known as > > > >>> dimension table). Maybe we need a DDL like CREATE VIEW and plus > some > > > >>> additional settings. > > > >>> 2) If we design the DDL for it like view, then maybe temporary is > ok > > > >>> enough. > > > >>> > > > >>> Konstantin Knauf <kna...@apache.org> 于2020年4月16日周四 下午8:16写道: > > > >>> > > > >>>> Hi everyone, > > > >>>> > > > >>>> it would be very useful if temporal tables could be created via > > DDL. > > > >>>> Currently, users either need to do this in the Table API or in the > > > >>>> environment file of the Flink CLI, which both require the user to > > > >> switch > > > >>>> the context of the SQL CLI/Editor. I recently created a ticket for > > > this > > > >>>> request [1]. > > > >>>> > > > >>>> I see two main questions: > > > >>>> > > > >>>> 1) What would be the DDL syntax? A Temporal Table is on the one > > hand a > > > >>> view > > > >>>> and on the other a function depending on how you look at it. > > > >>>> > > > >>>> 2) Would this temporal table view/function be stored in the > catalog > > or > > > >>> only > > > >>>> be temporary? > > > >>>> > > > >>>> I personally do not have much experience in this area of Flink, > so I > > > am > > > >>>> looking forward to hearing your thoughts on this. > > > >>>> > > > >>>> Best, > > > >>>> > > > >>>> Konstantin > > > >>>> > > > >>>> [1] https://issues.apache.org/jira/browse/FLINK-16824 > > > >>>> > > > >>>> -- > > > >>>> > > > >>>> Konstantin Knauf > > > >>>> > > > >>> > > > >>> > > > >>> -- > > > >>> > > > >>> Benchao Li > > > >>> School of Electronics Engineering and Computer Science, Peking > > > University > > > >>> Tel:+86-15650713730 > > > >>> Email: libenc...@gmail.com; libenc...@pku.edu.cn > > > >>> > > > >> > > > > > > > > > > > > >