Hi Leonard,

Thank you for the summary. I don't fully understand the implications of
(3). Would we support a temporal join with a changelog stream with
event time semantics by ignoring DELETE messages or would it be completed
unsupported. I mean something like the following sequence of statements:

CREATE TABLE currency_rates (
  currencyId BIGINT PRIMARY KEY,
  rate DECIMAL(10, 2)) WITH (
 'connector' = 'kafka',
 'format' = 'debezium-json'
)
*CREATE* TABLE transactions (
  currencyId BIGINT,
  transactionTime TIMESTAMP(3)) WITH (

)


SELECT
  ...FROM
  transactions AS t
  JOIN currency_rates FOR SYSTEM_TIME AS OF t.transactionTime AS r
  ON r.currency = t.currency

Cheers,

Konstantin

On Fri, Jul 3, 2020 at 4:52 PM Leonard Xu <xbjt...@gmail.com> wrote:

> Thanks Jingsong, Jark, Knauf, Seth for sharing your thoughts.
>
> Although we discussed many details about the concept, I think it’s worth
> to clarify the semantic from long term goals. Temporal table concept was
> first imported in SQL:2011, I made some investigation of Temporal Table
> work mechanism in traditional DBMS which implements it like SQL Server[1],
> PostgreSQL[2]
>
> In DBMS, Temporal Table is implemented as a pair of tables, *a current
> table and a history table*.The current table contains the current value
> of each row, the history table contains each previous value for each row.
> Each row contains a time range constructed by RowStartTime and RowEndTime
> to define the period validity of the row. The RowStartTime and RowEndTime
> is changed by DBMS when a DML operation happened, Given a simple temporal
> table in SQL Server to show how it works:
> CREATE TABLE dbo.currency (
> [currency] VARCHAR(10) NOT NULL PRIMARY KEY, [rate] INT, [RowStart]
> DATETIME2 GENERATED ALWAYS AS ROW START, [RowEnd] DATETIME2 GENERATED
> ALWAYS AS ROW END, PERIOD FOR SYSTEM_TIME (RowStart, RowEnd) ) WITH 
> (SYSTEM_VERSIONING
> = ON (HISTORY_TABLE = dbo.currency_History));
>
> 1> select * from currency; // *The initial test data, the RowEndTime is
> the max value of timestamp type* currency rate RowStart RowEnd ----------
> ----------- --------------------------------------
> -------------------------------------- Euro 114 2020-06-29 15:06:24.7459246
> 9999-12-31 23:59:59.9999999 US Dollar 102 2020-06-29 15:06:24.7503288
> 9999-12-31 23:59:59.9999999 1>* UPDATE dbo.currency SET [rate] = 118
> WHERE currency = 'Euro’*; //* UPDATE **Euro currency* 2> select * from
> *currency_History*; // *The history table increased a record that
> represents the validity period of record (Euro,114)* currency rate
> RowStart RowEnd ---------- -----------
> --------------------------------------
> -------------------------------------- Euro 114 2020-06-29 15:06:24.7459246
> 2020-06-29 15:07:01.1245406 1> *DELETE FROM dbo.currency WHERE currency =
> 'Euro’;* //* DELETE **Euro currency* 1> select * from *currency_History*;
> currency rate RowStart RowEnd // *The history table also increased
> another record that represents the validity period of record (Euro, 118)*
> ---------- ----------- --------------------------------------
> -------------------------------------- Euro 114 2020-06-29 15:06:24.7459246
> 2020-06-29 15:07:01.1245406 Euro 118 2020-06-29 15:07:01.1245406 2020-06-29
> 15:07:11.2981995 1> select * from currency; currency rate RowStart RowEnd
> // *Current table only keep the latest value * ---------- -----------
> --------------------------------------
> -------------------------------------- US Dollar 102 2020-06-29
> 15:06:24.7503288 9999-12-31 23:59:59.9999999
>
> The history table is very important for history version tracking, pleas
> note the *DELETE* operation also increase a record in history table and
> the record’s RowEndTime is the system time that the DELETE operation
> happened. In one word, temporal table use time range [RowStart, RowEnd) to
> mark period validity, store all versions’ records in history table for
> history tracking, use DBMS operation time to change the RowStart or
> RowEnd.
>
> Back to our Flink World, temporal table with event time attribute works
> well in data source that contains INSERT, UPDATE messages except DELETE 
> currently.
> Let us see what happened in DELETE message scenario(i.e. changelog
> source), both DBMS Temporal Table and other general table can capture data
> change by CDC tools and have same format, I used debezuim to capture a SQL
> server table changes:
>
> 1> select * from currency; currency rate RowStart RowEnd ----------
> ----------- --------------------------------------
> -------------------------------------- Euro 118 2020-06-29 15:07:01.1245406
> 9999-12-31 23:59:59.9999999 US Dollar 102 2020-06-29 15:06:24.7503288
> 9999-12-31 23:59:59.9999999 1>* DELETE FROM dbo.currency WHERE currency =
> 'Euro’; * //* DELETE **Euro currency* 1> select * from currency_History;
> currency rate RowStart RowEnd ---------- -----------
> --------------------------------------
> -------------------------------------- Euro 118 2020-06-29 15:07:01.1245406 
> *2020-06-29
> 15:07:11.2981995* { // *The DELETE record produced by CDC tools(both
> debezuim and canal are same)* "before": { "currency": "Euro", "rate":
> 118, "RowStart": 1593443221124540600, //2020-06-29 15:07:01.1245406
> "RowEnd": -4852116231933722724 //9999-12-31 23:59:59.9999999 }, "after":
> null, "op": "d”, // DELETE operation "ts_ms": 15934432361354, //*2020-06-29
> 15:07:16.354, the ’ts_ms’ value is bigger than the record delete operation
> time(**2020-06-29 15:07:11.2981995**)*
> * "transaction": null* }
>
> The main problem is that the *DELETE* record only contains current table
> message which  does not contain the expected RowEnd (*2020-06-29
> 15:07:11.2981995*) in history table. Without the exact RowEndTime, it’s
> impossible to obtain exact previous version of temporal table in Flink,
> `ts_ms` filed in CDC record is an approximate time of RowEndTime but  it
> depends on CDC tool status and can not equal the RowEndTime from semantics
> angle.
>
> Current Temporal Table Function supports:
> (1) Define temporal table backed upsert data source with process time
> (2) Define temporal table backed upsert data source with event time
> I think  the proposed Temporal table currently could support:
> (1) Define temporal table backed upsert(include delete) data source with
> process time
> (2) Define temporal table backed upsert  data source with event time
> (3) Do not support define temporal table backed  data source that
> contains DELETE message with event time. Because most CDC tools can not
> obtain the exact DELETE operation  time currently, the “ts_ms” field from
> meta is just an approximate time which will break event time semantics.
> And we can support it when CDC tools have the ability to
> obtain/extract the DML operation time.
>
> And this has get consensus from me, Jingsong, Jark and Kurt after offline
> discuss, the opinions from Knauf and Seth looks like same with us.
>
> I’ll prepare a design doc for temporal table, thanks everyone involving
> and please let me know if you have any concern.
>
> Best,
> Leonard Xu
> [1]
> https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables?view=sql-server-ver15
> [2] https://pgxn.org/dist/temporal_tables/
>
>
>

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk

Reply via email to