Hi Leonard, Thank you for the summary. I don't fully understand the implications of (3). Would we support a temporal join with a changelog stream with event time semantics by ignoring DELETE messages or would it be completed unsupported. I mean something like the following sequence of statements:
CREATE TABLE currency_rates ( currencyId BIGINT PRIMARY KEY, rate DECIMAL(10, 2)) WITH ( 'connector' = 'kafka', 'format' = 'debezium-json' ) *CREATE* TABLE transactions ( currencyId BIGINT, transactionTime TIMESTAMP(3)) WITH ( ) SELECT ...FROM transactions AS t JOIN currency_rates FOR SYSTEM_TIME AS OF t.transactionTime AS r ON r.currency = t.currency Cheers, Konstantin On Fri, Jul 3, 2020 at 4:52 PM Leonard Xu <xbjt...@gmail.com> wrote: > Thanks Jingsong, Jark, Knauf, Seth for sharing your thoughts. > > Although we discussed many details about the concept, I think it’s worth > to clarify the semantic from long term goals. Temporal table concept was > first imported in SQL:2011, I made some investigation of Temporal Table > work mechanism in traditional DBMS which implements it like SQL Server[1], > PostgreSQL[2] > > In DBMS, Temporal Table is implemented as a pair of tables, *a current > table and a history table*.The current table contains the current value > of each row, the history table contains each previous value for each row. > Each row contains a time range constructed by RowStartTime and RowEndTime > to define the period validity of the row. The RowStartTime and RowEndTime > is changed by DBMS when a DML operation happened, Given a simple temporal > table in SQL Server to show how it works: > CREATE TABLE dbo.currency ( > [currency] VARCHAR(10) NOT NULL PRIMARY KEY, [rate] INT, [RowStart] > DATETIME2 GENERATED ALWAYS AS ROW START, [RowEnd] DATETIME2 GENERATED > ALWAYS AS ROW END, PERIOD FOR SYSTEM_TIME (RowStart, RowEnd) ) WITH > (SYSTEM_VERSIONING > = ON (HISTORY_TABLE = dbo.currency_History)); > > 1> select * from currency; // *The initial test data, the RowEndTime is > the max value of timestamp type* currency rate RowStart RowEnd ---------- > ----------- -------------------------------------- > -------------------------------------- Euro 114 2020-06-29 15:06:24.7459246 > 9999-12-31 23:59:59.9999999 US Dollar 102 2020-06-29 15:06:24.7503288 > 9999-12-31 23:59:59.9999999 1>* UPDATE dbo.currency SET [rate] = 118 > WHERE currency = 'Euro’*; //* UPDATE **Euro currency* 2> select * from > *currency_History*; // *The history table increased a record that > represents the validity period of record (Euro,114)* currency rate > RowStart RowEnd ---------- ----------- > -------------------------------------- > -------------------------------------- Euro 114 2020-06-29 15:06:24.7459246 > 2020-06-29 15:07:01.1245406 1> *DELETE FROM dbo.currency WHERE currency = > 'Euro’;* //* DELETE **Euro currency* 1> select * from *currency_History*; > currency rate RowStart RowEnd // *The history table also increased > another record that represents the validity period of record (Euro, 118)* > ---------- ----------- -------------------------------------- > -------------------------------------- Euro 114 2020-06-29 15:06:24.7459246 > 2020-06-29 15:07:01.1245406 Euro 118 2020-06-29 15:07:01.1245406 2020-06-29 > 15:07:11.2981995 1> select * from currency; currency rate RowStart RowEnd > // *Current table only keep the latest value * ---------- ----------- > -------------------------------------- > -------------------------------------- US Dollar 102 2020-06-29 > 15:06:24.7503288 9999-12-31 23:59:59.9999999 > > The history table is very important for history version tracking, pleas > note the *DELETE* operation also increase a record in history table and > the record’s RowEndTime is the system time that the DELETE operation > happened. In one word, temporal table use time range [RowStart, RowEnd) to > mark period validity, store all versions’ records in history table for > history tracking, use DBMS operation time to change the RowStart or > RowEnd. > > Back to our Flink World, temporal table with event time attribute works > well in data source that contains INSERT, UPDATE messages except DELETE > currently. > Let us see what happened in DELETE message scenario(i.e. changelog > source), both DBMS Temporal Table and other general table can capture data > change by CDC tools and have same format, I used debezuim to capture a SQL > server table changes: > > 1> select * from currency; currency rate RowStart RowEnd ---------- > ----------- -------------------------------------- > -------------------------------------- Euro 118 2020-06-29 15:07:01.1245406 > 9999-12-31 23:59:59.9999999 US Dollar 102 2020-06-29 15:06:24.7503288 > 9999-12-31 23:59:59.9999999 1>* DELETE FROM dbo.currency WHERE currency = > 'Euro’; * //* DELETE **Euro currency* 1> select * from currency_History; > currency rate RowStart RowEnd ---------- ----------- > -------------------------------------- > -------------------------------------- Euro 118 2020-06-29 15:07:01.1245406 > *2020-06-29 > 15:07:11.2981995* { // *The DELETE record produced by CDC tools(both > debezuim and canal are same)* "before": { "currency": "Euro", "rate": > 118, "RowStart": 1593443221124540600, //2020-06-29 15:07:01.1245406 > "RowEnd": -4852116231933722724 //9999-12-31 23:59:59.9999999 }, "after": > null, "op": "d”, // DELETE operation "ts_ms": 15934432361354, //*2020-06-29 > 15:07:16.354, the ’ts_ms’ value is bigger than the record delete operation > time(**2020-06-29 15:07:11.2981995**)* > * "transaction": null* } > > The main problem is that the *DELETE* record only contains current table > message which does not contain the expected RowEnd (*2020-06-29 > 15:07:11.2981995*) in history table. Without the exact RowEndTime, it’s > impossible to obtain exact previous version of temporal table in Flink, > `ts_ms` filed in CDC record is an approximate time of RowEndTime but it > depends on CDC tool status and can not equal the RowEndTime from semantics > angle. > > Current Temporal Table Function supports: > (1) Define temporal table backed upsert data source with process time > (2) Define temporal table backed upsert data source with event time > I think the proposed Temporal table currently could support: > (1) Define temporal table backed upsert(include delete) data source with > process time > (2) Define temporal table backed upsert data source with event time > (3) Do not support define temporal table backed data source that > contains DELETE message with event time. Because most CDC tools can not > obtain the exact DELETE operation time currently, the “ts_ms” field from > meta is just an approximate time which will break event time semantics. > And we can support it when CDC tools have the ability to > obtain/extract the DML operation time. > > And this has get consensus from me, Jingsong, Jark and Kurt after offline > discuss, the opinions from Knauf and Seth looks like same with us. > > I’ll prepare a design doc for temporal table, thanks everyone involving > and please let me know if you have any concern. > > Best, > Leonard Xu > [1] > https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables?view=sql-server-ver15 > [2] https://pgxn.org/dist/temporal_tables/ > > > -- Konstantin Knauf https://twitter.com/snntrable https://github.com/knaufk