Hi, kurt, Fabian After an offline discussion with Jark, We think that the 'PERIOD FOR SYSTEM_TIME(operation_time)' statement might be needed now. Changelog table is superset of insert-only table, use PRIMARY KEY and rowtime may work well in insert-only or upsert source but has some problem in changelog table.
'PERIOD FOR SYSTEM_TIME(operation_time)' in a temporal table defines/maintains the valid time of each row, the rowtime can not play the history tracking function well. # 1.operation time (version time) vs rowtime (watermark) I will take an example to explain. The following changelog records came from database table using debezium tool: { "before": null "after": {"currency": "Euro", "rate": 118, "gmt_modified": "12:00:01"}, "op": "c", //INSERT "ts_ms": 1592971201000 // 2020-06-24 12:00:02 } { "before": {"currency": "Euro", "rate": 114, "gmt_modified": "12:00:05"}, "after": {"currency": "Euro", "rate": 118, "gmt_modified": "12:00:05"}, "op": "u", //UPDATE "ts_ms": 1592971206000 // 2020-06-24 12:00:06 } { "before": {"currency": "Euro", "rate": 118, "gmt_modified": "12:00:05"}, "after": null, "op": "d", //DELETE "ts_ms": 1593000011000 // 2020-06-24 20:00:11 } The rowtime should be the "gmt_modified" field that belongs to the original record,the "ts_ms" is the the operation time when the DML statement happen in the DB. For DELETE changelog record, its "gmt_modified" field (12:00:05) can not reflect the real operation time (20:00:11). In temporal join case, we should maintain the valid time of each row. For a DELETE event, we should use the operation time of DELETE as the “end time” of the row. That says, the record {"currency": "Euro", "rate": 118} is not exist anymore after “20:00:11”, not “12:00:05”. we would not access the record {"currency": "Euro", "rate": 118, "gmt_modified": "12:00:05"} when rowtime is bigger than (12:00:05) if we use rowtime to track the history version, because the DELETE changelog record also has rowtime (12:00:05) and will clear the record in state. In fact, the expected result is that the record expires until (20:00:11) when the record is deleted rather than the last update time(20:00:11) in materialized state. From this case, I found rowtime and operation time should be orthogonal in temporal table scenario. The operation time should be strictly monotonically increasing (no out of order) and only be used for tracking a history version of a changelog table, every history version of changelog table equals a database table snapshot due to the stream-table duality. # 2.The semantic of rowtime and watermark on changelog The rowtime and watermark can also be defined on a changelog table just like other source backed queue, Flink supports cascaded window aggregation (with ) in SQL like: SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND), MAX(rate) AS rate FROM ( SELECT MAX(rate) AS rate, TUMBLE_ROWTIME(rowtime, INTERVAL '5' SECOND) AS `rowtime` FROM currency GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND) ) GROUP BY TUMBLE(rowtime, INTERVAL '60' SECOND We can think of the output of the first window aggregation as a changelog source of the second window aggregation. There are INSERT/UPDATE/DELETE messages and also watermarks in the changelog stream. And the rowtime in the changelog stream is the `TUMBLE_ROWTIME` value (just like the `gmt_modified` column in DB). # summary we should use ‘PERIOD FOR SYSTEM_TIME(operation_time) syntax to track history version by operation time rather than rowtime in temporal table scenario. we also support define a rowtime(watermark) on changelog table, but the rowtime will not be used to track the history of changelog stream. WDYT? please correct me if I am wrong. Best, Leonard > 在 2020年6月24日,11:31,Leonard Xu <xbjt...@gmail.com> 写道: > > Hi, everyone > > Thanks Fabian,Kurt for making the multiple version(event time) clear, I also > like the 'PERIOD FOR SYSTEM' syntax which supported in SQL standard. I think > we can add some explanation of the multiple version support in the future > section of FLIP. > > For the PRIMARY KEY semantic, I agree with Jark's point that the semantic > should unify both on changelog source and insert-only source. > > Currently, Flink supports PRIMARY KEY after FLIP-87, Flink uses PRIMARY KEY > NOT ENFORCED because Flink does not own the data like other DBMS therefore > Flink won't validate/enforce the key integrity and only trusts the external > systems. It is expected user and external system/application should make > sure no deduplicated records happened when using NOT ENFORCED. > > (a) For PRIMARY KEY NOT ENFORCED semantic on changelog source: > It means the materialized changelogs (INSERT/UPDATE/DELETE) should be unique > on the primary key constraints.Flink assumes messages are in order on the > primary key. Flink will use the PRIMARY KEY for some optimization, e.g. use > the PRIMARY KEY to update the materialized state by key in temporal join > operator. > > (b) For PRIMARY KEY NOT ENFORCED semantic on insert-only source: > It means records should be unique on the primary key constraints. If there > are INSERT records with duplicate primary key columns, the result of SQL > query might be nondeterministic because it broken the PRIMARY KEY constraints. > > Cheers, > Leonard > > >> 在 2020年6月23日,23:35,Fabian Hueske <fhue...@gmail.com >> <mailto:fhue...@gmail.com>> 写道: >> >> Thanks Kurt, >> >> Yes, you are right. >> The `PERIOD FOR SYSTEM_TIME` that you linked before corresponds to the >> VERSION clause that I used and would explicitly define the versioning of a >> table. >> I didn't know that the `PERIOD FOR SYSTEM_TIME` cause is already defined by >> the SQL standard. >> I think we would need a slightly different syntax though because (so far) >> the validity of a row is determined by its own timestamp and the timestamp >> of the next row. >> >> Adding a clause later solves the ambiguity issue for tables with multiple >> event-time attributes. >> However, I'd feel more comfortable having such a cause and an explicit >> definition of the temporal property from the beginning. >> I guess this is a matter of personal preference so I'll go with the >> majority if we decide that every table that has a primary key and an >> event-time attribute should be usable in an event-time temporal table join. >> >> Thanks, Fabian >> >> >> Am Di., 23. Juni 2020 um 16:58 Uhr schrieb Kurt Young <ykt...@gmail.com >> <mailto:ykt...@gmail.com>>: >> >>> Hi Fabian, >>> >>> I agree with you that implicitly letting event time to be the version of >>> the table will >>> work in most cases, but not for all. That's the reason I mentioned `PERIOD >>> FOR` [1] >>> syntax in my first email, which is already in sql standard to represent the >>> validity of >>> each row in the table. >>> >>> If the event time can't be used, or multiple event time are defined, we >>> could still add >>> this syntax in the future. >>> >>> What do you think? >>> >>> [1] >>> >>> https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables?view=sql-server-ver15 >>> >>> <https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables?view=sql-server-ver15> >>> Best, >>> Kurt >>> >>> >>> On Tue, Jun 23, 2020 at 9:12 PM Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> Hi everyone, >>>> >>>> Every table with a primary key and an event-time attribute provides what >>> is >>>> needed for an event-time temporal table join. >>>> I agree that, from a technical point of view, the TEMPORAL keyword is not >>>> required. >>>> >>>> I'm more sceptical about implicitly deriving the versioning information >>> of >>>> a (temporal) table as the table's only event-time attribute. >>>> In the query >>>> >>>> SELECT * >>>> FROM orders o, rates r FOR SYSTEM_TIME AS OF o.ordertime >>>> WHERE o.currency = r.currency >>>> >>>> the syntax of the temporal table join does not explicitly reference the >>>> version of the temporal rates table. >>>> Hence, the system needs a way to derive the version of temporal table. >>>> >>>> Implicitly using the (only) event-time attribute of a temporal table >>> (rates >>>> in the example above) to identify the right version works in most cases, >>>> but probably not in all. >>>> * What if a table has more than one event-time attribute? (TableSchema is >>>> designed to support multiple watermarks; queries with interval joins >>>> produce tables with multiple event-time attributes, ...) >>>> * What if the table does not have an event-time attribute in its schema >>> but >>>> the version should only be provided as meta data? >>>> >>>> We could add a clause to define the version of a table, such as: >>>> >>>> CREATE TABLE rates ( >>>> currency CHAR(3) NOT NULL PRIMARY KEY, >>>> rate DOUBLE, >>>> rowtime TIMESTAMP, >>>> WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE), >>>> VERSION (rowtime) >>>> WITH (...); >>>> >>>> The presence of a the VERSION clause (or whatever syntax) would >>> explicitly >>>> define the version of a (temporal) table. >>>> It would also render the need for the TEMPORAL keyword superfluous >>> because >>>> there would be another indicator that a table can be used in a temporal >>>> table join. >>>> >>>> I'm OK with not adding the TEMPORAL keyword, but I recommend that we >>> think >>>> again about the proposed implicit definition of a table's version and how >>>> it might limit use in the future. >>>> >>>> Cheers, >>>> Fabian >>>> >>>> Am Mo., 22. Juni 2020 um 16:14 Uhr schrieb Jark Wu <imj...@gmail.com>: >>>> >>>>> I'm also +1 for not adding the TEMPORAL keyword. >>>>> >>>>> +1 to make the PRIMARY KEY semantic clear for sources. >>>>> From my point of view: >>>>> >>>>> 1) PRIMARY KEY on changelog souruce: >>>>> It means that when the changelogs (INSERT/UPDATE/DELETE) are >>>> materialized, >>>>> the materialized table should be unique on the primary key columns. >>>>> Flink assumes messages are in order on the primary key. Flink doesn't >>>>> validate/enforces the key integrity, but simply trust it (thus NOT >>>>> ENFORCED). >>>>> Flink will use the PRIMARY KEY for some optimization, e.g. use the >>>> PRIMARY >>>>> KEY to update the materilized state by key in temporal join operator. >>>>> >>>>> 2) PRIMARY KEY on insert-only source: >>>>> I prefer to have the same semantic to the batch source and changelog >>>>> source, that it implies that records are not duplicate on the primary >>>> key. >>>>> Flink just simply trust the primary key constraint, and doesn't valid >>> it. >>>>> If there is duplicate primary keys with INSERT changeflag, then result >>> of >>>>> Flink query might be wrong. >>>>> >>>>> If this is a TEMPORAL TABLE FUNCTION scenario, that source emits >>>> duplicate >>>>> primary keys with INSERT changeflag, when we migrate this case to >>>> temporal >>>>> table DDL, >>>>> I think this source should emit INSERT/UPDATE (UPSERT) messages instead >>>> of >>>>> INSERT-only messages, e.g. a Kafka compacted topic source? >>>>> >>>>> Best, >>>>> Jark >>>>> >>>>> >>>>> On Mon, 22 Jun 2020 at 17:04, Konstantin Knauf <kna...@apache.org> >>>> wrote: >>>>> >>>>>> Hi everyone, >>>>>> >>>>>> I also agree with Leonard/Kurt's proposal for CREATE TEMPORAL TABLE. >>>>>> >>>>>> Best, >>>>>> >>>>>> Konstantin >>>>>> >>>>>> On Mon, Jun 22, 2020 at 10:53 AM Kurt Young <ykt...@gmail.com> >>> wrote: >>>>>> >>>>>>> I agree with Timo, semantic about primary key needs more thought >>> and >>>>>>> discussion, especially after FLIP-95 and FLIP-105. >>>>>>> >>>>>>> Best, >>>>>>> Kurt >>>>>>> >>>>>>> >>>>>>> On Mon, Jun 22, 2020 at 4:45 PM Timo Walther <twal...@apache.org> >>>>> wrote: >>>>>>> >>>>>>>> Hi Leonard, >>>>>>>> >>>>>>>> thanks for the summary. >>>>>>>> >>>>>>>> After reading all of the previous arguments and working on >>>> FLIP-95. I >>>>>>>> would also lean towards the conclusion of not adding the TEMPORAL >>>>>>> keyword. >>>>>>>> >>>>>>>> After FLIP-95, what we considered as a CREATE TEMPORAL TABLE can >>> be >>>>>>>> represented as a CREATE TABLE with PRIMARY KEY and WATERMARK. The >>>> FOR >>>>>>>> SYSTEM_TIME AS OF t would trigger the internal materialization >>> and >>>>>>>> "temporal" logic. >>>>>>>> >>>>>>>> However, we should discuss the meaning of PRIMARY KEY again in >>> this >>>>>>>> case. In a TEMPORAL TABLE scenario, the source would emit >>> duplicate >>>>>>>> primary keys with INSERT changeflag but at different point in >>> time. >>>>>>>> Currently, we require a PRIMARY KEY NOT ENFORCED declaration. The >>>>>>>> changelog semantics of FLIP-95 and FLIP-105 don't work well with >>> a >>>>>>>> primary key declaration. >>>>>>>> >>>>>>>> Regards, >>>>>>>> Timo >>>>>>>> >>>>>>>> >>>>>>>> On 20.06.20 17:08, Leonard Xu wrote: >>>>>>>>> Hi everyone, >>>>>>>>> >>>>>>>>> Thanks for the nice discussion. I’d like to move forward the >>>> work, >>>>>>>> please let me simply summarize the main opinion and current >>>>>> divergences. >>>>>>>>> >>>>>>>>> 1. The agreements have been achieved: >>>>>>>>> >>>>>>>>> 1.1 The motivation we're discussing temporal table DDL is just >>>> for >>>>>>>> creating temporal table in pure SQL to replace pre-process >>> temporal >>>>>> table >>>>>>>> in YAML/Table API for usability. >>>>>>>>> 1.2 The reason we use "TEMPORAL" keyword rather than “PERIOD >>> FOR >>>>>>>> SYSTEM_TIME” is to make user understand easily. >>>>>>>>> 1.3 For append-only table, it can convert to changelog table >>>> which >>>>>> has >>>>>>>> been discussed in FLIP-105, we assume the following temporal >>> table >>>> is >>>>>>> comes >>>>>>>> from changelog (Jark, fabian, Timo). >>>>>>>>> 1.4 For temporal join syntax, using "FOR SYSTEM_TIME AS OF x" >>>>> instead >>>>>>> of >>>>>>>> the current `LATERAL TABLE(rates(x))` has come to an >>>>> agreement(Fabian, >>>>>>>> Timo, Seth, Konstantin, Kurt). >>>>>>>>> >>>>>>>>> 2. The small divergence : >>>>>>>>> >>>>>>>>> About the definition syntax of the temporal table, >>>>>>>>> >>>>>>>>> CREATE [TEMPORAL] TABLE rates ( >>>>>>>>> currency CHAR(3) NOT NULL PRIMARY KEY, >>>>>>>>> rate DOUBLE, >>>>>>>>> rowtime TIMESTAMP, >>>>>>>>> WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE) >>>>>>>>> WITH (...); >>>>>>>>> >>>>>>>>> there is small divergence whether add "TEMPORAL" keyword or >>> not. >>>>>>>>> >>>>>>>>> 2.1 one opinion is using "CREATE TEMPORAL TABLE" (Timo, >>> Fabian, >>>>>> Seth), >>>>>>>> the main advantages are: >>>>>>>>> (1)"TEMPORAL" keyword is intuitive to indicate the history >>>> tracking >>>>>>>> semantics. >>>>>>>>> (2)"TEMPORAL" keyword illustrates that queries can visit the >>>>> previous >>>>>>>> versions of a table like other DBMS use "PERIOD FOR SYSTEM_TIME" >>>>>> keyword. >>>>>>>>> >>>>>>>>> 2.2 the other is using "CREATE TABLE"(Kurt), the main >>> advantages >>>>> are: >>>>>>>>> (1)Just primary key and time attribute can track previous >>>> versions >>>>>> of a >>>>>>>> table well. >>>>>>>>> (2)The temporal behavior is triggered by temporal join syntax >>>>> rather >>>>>>>> than in DDL, all Flink DDL table are dynamic table logically >>>>> including >>>>>>>> temporal table. If we decide to use "TEMPORAL" keyword and treats >>>>>>> changelog >>>>>>>> as temporal table, other tables backed queue like Kafka should >>> also >>>>> use >>>>>>>> "TEMPORAL" keyword. >>>>>>>>> >>>>>>>>> >>>>>>>>> IMO, the statement “CREATE TEMPORARY TEMPORAL TABLE...” follows >>>>> with >>>>>>> 2.1 >>>>>>>> may confuse users much. If we take a second to think about, for >>>>>>> source/sink >>>>>>>> table which may backed queue (like kafka) or DB (like MySQL), we >>>> did >>>>>> not >>>>>>>> add any keyword in DDL to specify they are source or sinks, it >>>> works >>>>>>> well. >>>>>>>>> I think temporal table is the third one, kafka data source and >>>> DB >>>>>> data >>>>>>>> source can play as a source/sink/temporal table depends on the >>>>>>>> position/syntax that user put them in the query. The above rates >>>>> table >>>>>>>>> - can be a source table if user put it at `SELECT * FROM >>>>> rates;` >>>>>>>>> - can be a temporal table if user put it at `SELECT * FROM >>>>>> orders >>>>>>>> JOIN rates FOR SYSTEM_TIME AS OF orders.proctime >>>>>>>>> ON orders.currency = rates.currency;` >>>>>>>>> - can be sink table if user put is at `INSERT INTO rates >>>>> SELECT >>>>>> * >>>>>>>> FROM …; ` >>>>>>>>> From these cases, we found all tables defined in Flink should >>> be >>>>>>>> dynamic table logically, the source/sink/temporal role depends on >>>> the >>>>>>>> position/syntax in user’s query. >>>>>>>>> In fact we have used similar syntax for current lookup >>>>> table, >>>>>> we >>>>>>>> didn’t add “LOOKUP" or “TEMPORAL" keyword for lookup table and >>>>> trigger >>>>>>> the >>>>>>>> temporal join from the position/syntax(“FOR SYSTEM_TIME AS OF x") >>>> in >>>>>>> query. >>>>>>>>> >>>>>>>>> So, I prefer to resolve the small divergence with “CREATE >>> TABLE” >>>>>> which >>>>>>>>> (1) is more unified with our source/sink/temporal dynamic table >>>>>>>> conceptually, >>>>>>>>> (2) is aligned with current lookup table, >>>>>>>>> (3) also make users learn less keyword. >>>>>>>>> >>>>>>>>> WDYT? >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Leonard Xu >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Konstantin Knauf >>>>>> >>>>>> https://twitter.com/snntrable >>>>>> >>>>>> https://github.com/knaufk >>>>>> >>>>> >>>> >>> >