Hi, kurt, Fabian

After an offline discussion with Jark, We think that the  'PERIOD FOR 
SYSTEM_TIME(operation_time)' statement might be needed now. Changelog table is 
superset of insert-only table, use PRIMARY KEY and rowtime may work well in 
insert-only or upsert source but has some problem in changelog table.

'PERIOD FOR SYSTEM_TIME(operation_time)' in a temporal table defines/maintains  
the valid time of each row, the rowtime can not play the history tracking 
function well.

# 1.operation time (version time) vs rowtime (watermark)

I will take an example to explain. The following changelog records came from 
database table using debezium tool:
{ "before":  null
  "after":    {"currency": "Euro", "rate": 118, "gmt_modified": "12:00:01"},
  "op":       "c",  //INSERT
  "ts_ms": 1592971201000 // 2020-06-24 12:00:02
}
{ "before": {"currency": "Euro", "rate": 114, "gmt_modified": "12:00:05"},
  "after":    {"currency": "Euro", "rate": 118, "gmt_modified": "12:00:05"},
  "op":       "u",  //UPDATE
  "ts_ms": 1592971206000 // 2020-06-24 12:00:06
}

{ "before": {"currency": "Euro", "rate": 118, "gmt_modified": "12:00:05"},
  "after":     null,
  "op":        "d",  //DELETE
  "ts_ms":  1593000011000  // 2020-06-24 20:00:11
}

The rowtime should be the "gmt_modified" field that belongs to the original 
record,the "ts_ms" is the the operation time when the DML statement happen in 
the DB. For DELETE changelog record, its "gmt_modified" field (12:00:05) can 
not reflect the real operation time (20:00:11).

In temporal join case, we should maintain the valid time of each row. For a 
DELETE event, we should use the operation time of DELETE as the “end time” of 
the row. That says, the record {"currency": "Euro", "rate": 118} is not exist 
anymore after “20:00:11”, not “12:00:05”.

we would not access the record {"currency": "Euro", "rate": 118, 
"gmt_modified": "12:00:05"} when rowtime is bigger than (12:00:05) if we use 
rowtime to track the history version, because the DELETE changelog record also 
has rowtime (12:00:05) and will clear the record in state. In fact, the 
expected result is that the record expires until (20:00:11) when the record is 
deleted rather than the last update time(20:00:11) in materialized state.

From this case, I found rowtime and operation time should be orthogonal in 
temporal table scenario. The operation time should be strictly monotonically 
increasing  (no out of order) and only be used for tracking a history version 
of a changelog table, every history version of changelog table equals a 
database table snapshot due to the stream-table duality. 

# 2.The semantic of rowtime and watermark on changelog 

The rowtime and watermark can also be defined on a changelog table just like 
other source backed queue, Flink supports cascaded window aggregation (with ) 
in SQL like:
SELECT
     TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND),
     MAX(rate) AS rate
FROM (
       SELECT
          MAX(rate) AS rate,
          TUMBLE_ROWTIME(rowtime, INTERVAL '5' SECOND) AS `rowtime`
       FROM currency
            GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND)
    )
    GROUP BY TUMBLE(rowtime, INTERVAL '60' SECOND

We can think of the output of the first window aggregation as a changelog 
source of the second window aggregation. There are INSERT/UPDATE/DELETE 
messages and also watermarks in the changelog stream. And the rowtime in the 
changelog stream is the `TUMBLE_ROWTIME` value (just like the `gmt_modified` 
column in DB). 

#  summary
we should use ‘PERIOD FOR SYSTEM_TIME(operation_time) syntax to track history 
version by operation time rather than rowtime in temporal table scenario.
we also support define a rowtime(watermark) on changelog table, but the rowtime 
will not be used to track the history of changelog stream.  


WDYT? please correct me if I am wrong.


Best,
Leonard




> 在 2020年6月24日,11:31,Leonard Xu <xbjt...@gmail.com> 写道:
> 
> Hi, everyone
> 
> Thanks Fabian,Kurt for making the multiple version(event time) clear, I also 
> like the 'PERIOD FOR SYSTEM' syntax which supported in SQL standard. I think 
> we can add some explanation of the multiple version support in the future 
> section of FLIP.
> 
> For the PRIMARY KEY semantic, I agree with Jark's point that the semantic 
> should unify both on changelog source and insert-only source.
> 
> Currently, Flink supports PRIMARY KEY after FLIP-87, Flink uses PRIMARY KEY 
> NOT ENFORCED because Flink does not own the data like other DBMS therefore 
> Flink won't validate/enforce the key integrity and only trusts the external 
> systems. It is  expected user and external system/application should make 
> sure no deduplicated records happened when using NOT ENFORCED.
> 
> (a) For PRIMARY KEY NOT ENFORCED semantic on changelog source:
> It means the materialized changelogs (INSERT/UPDATE/DELETE) should be unique 
> on the primary key constraints.Flink assumes messages are in order on the 
> primary key. Flink will use the PRIMARY KEY for some optimization, e.g. use 
> the PRIMARY KEY to update the materialized state by key in temporal join 
> operator. 
>  
> (b) For PRIMARY KEY NOT ENFORCED semantic on insert-only source:
> It means records should be unique on the primary key constraints. If there 
> are INSERT records with duplicate primary key columns, the result of SQL 
> query might be nondeterministic because it broken the PRIMARY KEY constraints.
> 
> Cheers,
> Leonard
> 
> 
>> 在 2020年6月23日,23:35,Fabian Hueske <fhue...@gmail.com 
>> <mailto:fhue...@gmail.com>> 写道:
>> 
>> Thanks Kurt,
>> 
>> Yes, you are right.
>> The `PERIOD FOR SYSTEM_TIME` that you linked before corresponds to the
>> VERSION clause that I used and would explicitly define the versioning of a
>> table.
>> I didn't know that the `PERIOD FOR SYSTEM_TIME` cause is already defined by
>> the SQL standard.
>> I think we would need a slightly different syntax though because (so far)
>> the validity of a row is determined by its own timestamp and the timestamp
>> of the next row.
>> 
>> Adding a clause later solves the ambiguity issue for tables with multiple
>> event-time attributes.
>> However, I'd feel more comfortable having such a cause and an explicit
>> definition of the temporal property from the beginning.
>> I guess this is a matter of personal preference so I'll go with the
>> majority if we decide that every table that has a primary key and an
>> event-time attribute should be usable in an event-time temporal table join.
>> 
>> Thanks, Fabian
>> 
>> 
>> Am Di., 23. Juni 2020 um 16:58 Uhr schrieb Kurt Young <ykt...@gmail.com 
>> <mailto:ykt...@gmail.com>>:
>> 
>>> Hi Fabian,
>>> 
>>> I agree with you that implicitly letting event time to be the version of
>>> the table will
>>> work in most cases, but not for all. That's the reason I mentioned `PERIOD
>>> FOR` [1]
>>> syntax in my first email, which is already in sql standard to represent the
>>> validity of
>>> each row in the table.
>>> 
>>> If the event time can't be used, or multiple event time are defined, we
>>> could still add
>>> this syntax in the future.
>>> 
>>> What do you think?
>>> 
>>> [1]
>>> 
>>> https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables?view=sql-server-ver15
>>>  
>>> <https://docs.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables?view=sql-server-ver15>
>>> Best,
>>> Kurt
>>> 
>>> 
>>> On Tue, Jun 23, 2020 at 9:12 PM Fabian Hueske <fhue...@gmail.com> wrote:
>>> 
>>>> Hi everyone,
>>>> 
>>>> Every table with a primary key and an event-time attribute provides what
>>> is
>>>> needed for an event-time temporal table join.
>>>> I agree that, from a technical point of view, the TEMPORAL keyword is not
>>>> required.
>>>> 
>>>> I'm more sceptical about implicitly deriving the versioning information
>>> of
>>>> a (temporal) table as the table's only event-time attribute.
>>>> In the query
>>>> 
>>>> SELECT *
>>>> FROM orders o, rates r FOR SYSTEM_TIME AS OF o.ordertime
>>>> WHERE o.currency = r.currency
>>>> 
>>>> the syntax of the temporal table join does not explicitly reference the
>>>> version of the temporal rates table.
>>>> Hence, the system needs a way to derive the version of temporal table.
>>>> 
>>>> Implicitly using the (only) event-time attribute of a temporal table
>>> (rates
>>>> in the example above) to identify the right version works in most cases,
>>>> but probably not in all.
>>>> * What if a table has more than one event-time attribute? (TableSchema is
>>>> designed to support multiple watermarks; queries with interval joins
>>>> produce tables with multiple event-time attributes, ...)
>>>> * What if the table does not have an event-time attribute in its schema
>>> but
>>>> the version should only be provided as meta data?
>>>> 
>>>> We could add a clause to define the version of a table, such as:
>>>> 
>>>> CREATE TABLE rates (
>>>>   currency CHAR(3) NOT NULL PRIMARY KEY,
>>>>   rate DOUBLE,
>>>>   rowtime TIMESTAMP,
>>>>   WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE),
>>>> VERSION (rowtime)
>>>> WITH (...);
>>>> 
>>>> The presence of a the VERSION clause (or whatever syntax) would
>>> explicitly
>>>> define the version of a (temporal) table.
>>>> It would also render the need for the TEMPORAL keyword superfluous
>>> because
>>>> there would be another indicator that a table can be used in a temporal
>>>> table join.
>>>> 
>>>> I'm OK with not adding the TEMPORAL keyword, but I recommend that we
>>> think
>>>> again about the proposed implicit definition of a table's version and how
>>>> it might limit use in the future.
>>>> 
>>>> Cheers,
>>>> Fabian
>>>> 
>>>> Am Mo., 22. Juni 2020 um 16:14 Uhr schrieb Jark Wu <imj...@gmail.com>:
>>>> 
>>>>> I'm also +1 for not adding the TEMPORAL keyword.
>>>>> 
>>>>> +1 to make the PRIMARY KEY semantic clear for sources.
>>>>> From my point of view:
>>>>> 
>>>>> 1) PRIMARY KEY on changelog souruce:
>>>>> It means that when the changelogs (INSERT/UPDATE/DELETE) are
>>>> materialized,
>>>>> the materialized table should be unique on the primary key columns.
>>>>> Flink assumes messages are in order on the primary key. Flink doesn't
>>>>> validate/enforces the key integrity, but simply trust it (thus NOT
>>>>> ENFORCED).
>>>>> Flink will use the PRIMARY KEY for some optimization, e.g. use the
>>>> PRIMARY
>>>>> KEY to update the materilized state by key in temporal join operator.
>>>>> 
>>>>> 2) PRIMARY KEY on insert-only source:
>>>>> I prefer to have the same semantic to the batch source and changelog
>>>>> source, that it implies that records are not duplicate on the primary
>>>> key.
>>>>> Flink just simply trust the primary key constraint, and doesn't valid
>>> it.
>>>>> If there is duplicate primary keys with INSERT changeflag, then result
>>> of
>>>>> Flink query might be wrong.
>>>>> 
>>>>> If this is a TEMPORAL TABLE FUNCTION scenario, that source emits
>>>> duplicate
>>>>> primary keys with INSERT changeflag, when we migrate this case to
>>>> temporal
>>>>> table DDL,
>>>>> I think this source should emit INSERT/UPDATE (UPSERT) messages instead
>>>> of
>>>>> INSERT-only messages,  e.g. a Kafka compacted topic source?
>>>>> 
>>>>> Best,
>>>>> Jark
>>>>> 
>>>>> 
>>>>> On Mon, 22 Jun 2020 at 17:04, Konstantin Knauf <kna...@apache.org>
>>>> wrote:
>>>>> 
>>>>>> Hi everyone,
>>>>>> 
>>>>>> I also agree with Leonard/Kurt's proposal for CREATE TEMPORAL TABLE.
>>>>>> 
>>>>>> Best,
>>>>>> 
>>>>>> Konstantin
>>>>>> 
>>>>>> On Mon, Jun 22, 2020 at 10:53 AM Kurt Young <ykt...@gmail.com>
>>> wrote:
>>>>>> 
>>>>>>> I agree with Timo, semantic about primary key needs more thought
>>> and
>>>>>>> discussion, especially after FLIP-95 and FLIP-105.
>>>>>>> 
>>>>>>> Best,
>>>>>>> Kurt
>>>>>>> 
>>>>>>> 
>>>>>>> On Mon, Jun 22, 2020 at 4:45 PM Timo Walther <twal...@apache.org>
>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi Leonard,
>>>>>>>> 
>>>>>>>> thanks for the summary.
>>>>>>>> 
>>>>>>>> After reading all of the previous arguments and working on
>>>> FLIP-95. I
>>>>>>>> would also lean towards the conclusion of not adding the TEMPORAL
>>>>>>> keyword.
>>>>>>>> 
>>>>>>>> After FLIP-95, what we considered as a CREATE TEMPORAL TABLE can
>>> be
>>>>>>>> represented as a CREATE TABLE with PRIMARY KEY and WATERMARK. The
>>>> FOR
>>>>>>>> SYSTEM_TIME AS OF t would trigger the internal materialization
>>> and
>>>>>>>> "temporal" logic.
>>>>>>>> 
>>>>>>>> However, we should discuss the meaning of PRIMARY KEY again in
>>> this
>>>>>>>> case. In a TEMPORAL TABLE scenario, the source would emit
>>> duplicate
>>>>>>>> primary keys with INSERT changeflag but at different point in
>>> time.
>>>>>>>> Currently, we require a PRIMARY KEY NOT ENFORCED declaration. The
>>>>>>>> changelog semantics of FLIP-95 and FLIP-105 don't work well with
>>> a
>>>>>>>> primary key declaration.
>>>>>>>> 
>>>>>>>> Regards,
>>>>>>>> Timo
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On 20.06.20 17:08, Leonard Xu wrote:
>>>>>>>>> Hi everyone,
>>>>>>>>> 
>>>>>>>>> Thanks for the nice discussion. I’d like to move forward the
>>>> work,
>>>>>>>> please let me simply summarize the main opinion and current
>>>>>> divergences.
>>>>>>>>> 
>>>>>>>>> 1. The agreements have been achieved:
>>>>>>>>> 
>>>>>>>>> 1.1 The motivation we're discussing temporal table DDL is just
>>>> for
>>>>>>>> creating temporal table in pure SQL to replace pre-process
>>> temporal
>>>>>> table
>>>>>>>> in YAML/Table API for usability.
>>>>>>>>> 1.2 The reason we use "TEMPORAL" keyword rather than “PERIOD
>>> FOR
>>>>>>>> SYSTEM_TIME” is to make user understand easily.
>>>>>>>>> 1.3 For append-only table, it can convert to changelog table
>>>> which
>>>>>> has
>>>>>>>> been discussed in FLIP-105, we assume the following temporal
>>> table
>>>> is
>>>>>>> comes
>>>>>>>> from changelog (Jark, fabian, Timo).
>>>>>>>>> 1.4 For temporal join syntax, using "FOR SYSTEM_TIME AS OF x"
>>>>> instead
>>>>>>> of
>>>>>>>> the current `LATERAL TABLE(rates(x))`  has come to an
>>>>> agreement(Fabian,
>>>>>>>> Timo, Seth, Konstantin, Kurt).
>>>>>>>>> 
>>>>>>>>> 2. The small divergence :
>>>>>>>>> 
>>>>>>>>> About the definition syntax of the temporal table,
>>>>>>>>> 
>>>>>>>>> CREATE [TEMPORAL] TABLE rates (
>>>>>>>>>    currency CHAR(3) NOT NULL PRIMARY KEY,
>>>>>>>>>    rate DOUBLE,
>>>>>>>>>    rowtime TIMESTAMP,
>>>>>>>>>    WATERMARK FOR rowtime AS rowtime - INTERVAL '5' MINUTE)
>>>>>>>>> WITH (...);
>>>>>>>>> 
>>>>>>>>> there is small divergence whether add "TEMPORAL" keyword or
>>> not.
>>>>>>>>> 
>>>>>>>>> 2.1  one opinion is using "CREATE TEMPORAL TABLE" (Timo,
>>> Fabian,
>>>>>> Seth),
>>>>>>>> the main advantages are:
>>>>>>>>> (1)"TEMPORAL" keyword is intuitive to indicate the history
>>>> tracking
>>>>>>>> semantics.
>>>>>>>>> (2)"TEMPORAL" keyword illustrates that queries can visit the
>>>>> previous
>>>>>>>> versions of a table like other DBMS use "PERIOD FOR SYSTEM_TIME"
>>>>>> keyword.
>>>>>>>>> 
>>>>>>>>> 2.2 the other is using "CREATE TABLE"(Kurt), the main
>>> advantages
>>>>> are:
>>>>>>>>> (1)Just primary key and time attribute can track previous
>>>> versions
>>>>>> of a
>>>>>>>> table well.
>>>>>>>>> (2)The temporal behavior is triggered by temporal join syntax
>>>>> rather
>>>>>>>> than in DDL, all Flink DDL table are dynamic table logically
>>>>> including
>>>>>>>> temporal table. If we decide to use "TEMPORAL" keyword and treats
>>>>>>> changelog
>>>>>>>> as temporal table, other tables backed queue like Kafka should
>>> also
>>>>> use
>>>>>>>> "TEMPORAL" keyword.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> IMO, the statement “CREATE TEMPORARY TEMPORAL TABLE...” follows
>>>>> with
>>>>>>> 2.1
>>>>>>>> may confuse users much. If we take a second to think about, for
>>>>>>> source/sink
>>>>>>>> table which may backed queue (like kafka) or DB (like MySQL), we
>>>> did
>>>>>> not
>>>>>>>> add any keyword in DDL to specify they are source or sinks, it
>>>> works
>>>>>>> well.
>>>>>>>>> I think temporal table is the third one,  kafka data source and
>>>> DB
>>>>>> data
>>>>>>>> source can play as a source/sink/temporal table depends on the
>>>>>>>> position/syntax that user put them in the query. The above rates
>>>>> table
>>>>>>>>>     - can be a source table if user put it at `SELECT * FROM
>>>>> rates;`
>>>>>>>>>     - can be a temporal table if user put it at `SELECT * FROM
>>>>>> orders
>>>>>>>> JOIN rates FOR SYSTEM_TIME AS OF orders.proctime
>>>>>>>>>              ON orders.currency = rates.currency;`
>>>>>>>>>     - can be sink table if user put is at `INSERT INTO rates
>>>>> SELECT
>>>>>> *
>>>>>>>> FROM …; `
>>>>>>>>> From these cases, we found all tables defined in Flink should
>>> be
>>>>>>>> dynamic table logically, the source/sink/temporal role depends on
>>>> the
>>>>>>>> position/syntax in user’s query.
>>>>>>>>>       In fact we have used similar syntax for current lookup
>>>>> table,
>>>>>> we
>>>>>>>> didn’t add “LOOKUP" or “TEMPORAL" keyword for lookup table and
>>>>> trigger
>>>>>>> the
>>>>>>>> temporal join from the position/syntax(“FOR SYSTEM_TIME AS OF x")
>>>> in
>>>>>>> query.
>>>>>>>>> 
>>>>>>>>> So, I prefer to resolve the small divergence with “CREATE
>>> TABLE”
>>>>>> which
>>>>>>>>> (1) is more unified with our source/sink/temporal dynamic table
>>>>>>>> conceptually,
>>>>>>>>> (2) is aligned with current lookup table,
>>>>>>>>> (3) also make users learn less keyword.
>>>>>>>>> 
>>>>>>>>> WDYT?
>>>>>>>>> 
>>>>>>>>> Best,
>>>>>>>>> Leonard Xu
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> 
>>>>>> Konstantin Knauf
>>>>>> 
>>>>>> https://twitter.com/snntrable
>>>>>> 
>>>>>> https://github.com/knaufk
>>>>>> 
>>>>> 
>>>> 
>>> 
> 

Reply via email to