Hi, all

I open a new discussion of FLIP-132[1] which based on our consensus on current 
thread.

Let me keep communication in the new thread, please let me know if you have any 
concerns.

Best
Leonard
[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-132-Temporal-Table-DDL-td43483.html
 
<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-132-Temporal-Table-DDL-td43483.html>

> 在 2020年7月7日,00:31,Seth Wiesman <sjwies...@gmail.com> 写道:
> 
> * I mistyped the rejected_query, it should be
> 
> CREATE VIEW AS post_agg_stream SELECT currencyId, AVG(rate)* as *rate*
> FROM *currency_rates
> 
> CREATE VIEW AS rejected_query
> SELECT
>  ...FROM
>  transactions AS t
>  JOIN post_agg_stream FOR SYSTEM_TIME AS OF t.transactionTime AS r
>  ON r.currency = t.currency
> 
> 
> On Mon, Jul 6, 2020 at 11:29 AM Seth Wiesman <sjwies...@gmail.com> wrote:
> 
>> Hey Leonard,
>> 
>> Agreed, this is a fun discussion!
>> 
>> (1) For support changelog source backed CDC tools, a problem is that can
>>> we use the temporal table as a general source table which may followed by
>>> some aggregation operations,  more accurate is wether the aggregation
>>> operator can use the DELETE record that we just updated the “correct”
>>> operation time to retract a record, maybe not. This will pull us back to
>>> the discussion of operation time VS event time, it’s a real cool but
>>> complicated topic see above discussion from mine and @Jark’s.
>>> 
>> 
>> I fully agree this is a complicated topic, however, I don't think its
>> actually a problem that needs to be solved for the first version of this
>> feature. My proposal is to disallow using upsert streams as temporal tables
>> if an aggregation operation has been applied. Going back to my notion that
>> temporal tables are a tool for performing streaming star schema
>> denormalization, the dimension table in a star schema is rarely aggregated
>> pre-join. In the case of a CDC stream of currency rates joined to
>> transactions, the CDC stream only needs to support filter pushdowns and
>> map-like transformations before being joined. I believe this is a
>> reasonable limitation we can impose that will unblock a large percentage of
>> use cases, and once we better understand the semantics of the correct
>> operation in a retraction the limitation can be removed in future versions
>> while remaining backward compatible.
>> 
>> 
>> 
>> CREATE TABLE currency_rates (
>>  currencyId BIGINT PRIMARY KEY,
>>  rate DECIMAL(10, 2)) WITH (
>> 'connector' = 'kafka',
>> 'format' = 'debezium-json'
>> )
>> *CREATE* TABLE transactions (
>>  currencyId BIGINT,
>>  transactionTime TIMESTAMP(3)) WITH (
>> 
>> )
>> 
>> -- Uner my proposal this query would be supported because the currency_rates
>> 
>> -- table is used in a temporal join without any aggregations having been 
>> applied
>> 
>> CREATE VIEW AS working_query
>> SELECT
>>  ...FROM
>>  transactions AS t
>>  JOIN currency_rates FOR SYSTEM_TIME AS OF t.transactionTime AS r
>>  ON r.currency = t.currencyId
>> 
>> -- However, this query would be rejected by the planner until we determine 
>> the proper time semantics of a retacation
>> 
>> CREATE VIEW AS post_agg_stream SELECT currencyId, AVG(rate)* as *rate* FROM 
>> *currency_rates
>> 
>> CREATE VIEW AS rejected_query
>> SELECT
>>  ...FROM
>>  transactions AS t
>>  JOIN currency_rates FOR SYSTEM_TIME AS OF t.transactionTime AS r
>>  ON r.currency = t.currency
>> 
>> 
>> 
>> (2) For upsert source that defines PRIMARY KEY and may contains multiple
>>> records under a PK, the main problem is the  PK semantic,the multiple
>>> records under a PK broke the unique semantic on a table. We need to walk
>>> around this by (a) Adding another primary key keyword and explain the
>>> upsert semantic (b) Creating temporal table base on a view that is the
>>> deduplicated result of source table[2].
>>> 
>> 
>> This feels like more of a bikeshedding question than a blocker and I look
>> forward to seeing what you come up with!
>> 
>> Seth
>> 
>> On Mon, Jul 6, 2020 at 10:59 AM Benchao Li <libenc...@apache.org> wrote:
>> 
>>> Hi everyone,
>>> 
>>> Thanks a lot for the great discussions so far.
>>> 
>>> After reading through the long discussion, I still have one question.
>>> Currently the temporal table function supports both event time and proc
>>> time joining.
>>> If we use "FOR SYSTEM_TIME AS OF" syntax without "TEMPORAL" keyword in
>>> DDL,
>>> does it mean we can only use temporal table function join with event time?
>>> If we can, how do we distinguish it with current temporal table (also
>>> known as dimension table)?
>>> 
>>> Maybe I'm missing something here. Correct me if I'm wrong.
>>> 
>>> Leonard Xu <xbjt...@gmail.com> 于2020年7月6日周一 下午11:34写道:
>>> 
>>>> Hi, Seth
>>>> 
>>>> Thanks for your explanation of user cases, and you’re wright the look up
>>>> join/table is one kind of temporal table join/table which tracks latest
>>>> snapshot of external  DB-like tables, it's why we proposed use same
>>>> temporal join syntax.
>>>> 
>>>> In fact, I have invested and checked Debezuim format and Canal format
>>>> more these days, and we can obtain the extract DML operation time from
>>>> their meta information which comes from DB bin-log.  Although extracting
>>>> meta information from record is a part of FLIP-107 scope[1], at least we
>>>> have a way to extract the correct operation time. Event we can obtain the
>>>> expected operation time, there’re some problems.
>>>> 
>>>> (1) For support changelog source backed CDC tools, a problem is that can
>>>> we use the temporal table as a general source table which may followed by
>>>> some aggregation operations,  more accurate is wether the aggregation
>>>> operator can use the DELETE record that we just updated the “correct”
>>>> operation time to retract a record, maybe not. This will pull us back to
>>>> the discussion of operation time VS event time, it’s a real cool but
>>>> complicated topic see above discussion from mine and @Jark’s.
>>>> 
>>>> (2) For upsert source that defines PRIMARY KEY and may contains multiple
>>>> records under a PK, the main problem is the  PK semantic,the multiple
>>>> records under a PK broke the unique semantic on a table. We need to walk
>>>> around this by (a) Adding another primary key keyword and explain the
>>>> upsert semantic (b) Creating temporal table base on a view that is the
>>>> deduplicated result of source table[2].
>>>> 
>>>> I’m working on (2), and if we want to support(1)i.e. support DELETE
>>>> entirely, that’s really a big challenge but I also think wright thing for
>>>> long term.
>>>> 
>>>> If we decide to do (1), we need import operation time concept firstly,
>>>> we need change the codebase for deal the operation time header in many
>>>> places secondly, and finally explain and tell users how to understand and
>>>> use temporal table.
>>>> 
>>>> I’m a little worried about it’s valuable enough, I proposed only support
>>>> (2) because it is a good replacement of current Temporal Table Function and
>>>> will not introduce more concept and works.
>>>> 
>>>> Jark, Jingsong, Konstantin, WDYT?
>>>> 
>>>> 
>>>> Best,
>>>> Leonard Xu
>>>> [1]
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Accessread-onlymetadatae.g.partition
>>>> <
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Accessread-onlymetadatae.g.partition
>>>>> 
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>>>> <
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>>>>> 
>>>> 
>>>> 
>>>>> 在 2020年7月6日,22:02,Seth Wiesman <sjwies...@gmail.com> 写道:
>>>>> 
>>>>> As an aside, I conceptually view temporal table joins to be
>>>> semantically equivalent to look up table joins. They are just two different
>>>> ways of consuming the same data.
>>>>> 
>>>>> Seth
>>>>> 
>>>>> On Mon, Jul 6, 2020 at 8:56 AM Seth Wiesman <sjwies...@gmail.com
>>>> <mailto:sjwies...@gmail.com>> wrote:
>>>>> Hi Leonard,
>>>>> 
>>>>> Regarding DELETE operations I tend to have the opposite reaction. I
>>>> spend a lot of time working with production Flink users across a large
>>>> number of organizations and to say we don't support temporal tables that
>>>> include DELETEs will be a blocker for adoption. Even organizations that
>>>> claim to never delete rows still occasionally due so per  GDPR requests or
>>>> other regulations.
>>>>> 
>>>>> I actually do think users will understand the limitations. Flink today
>>>> has a very clear value proposition around correctness, your results are as
>>>> correct as the input data provided. This does not change under support for
>>>> DELETE records. Flink is providing the most correct results possible based
>>>> on the resolution of the fields as generated by 3rd party systems. As
>>>> Debezium and other CDC libraries become more accurate, so will Flink.
>>>>> 
>>>>> Seth
>>>>> 
>>>>> On Fri, Jul 3, 2020 at 11:00 PM Leonard Xu <xbjt...@gmail.com <mailto:
>>>> xbjt...@gmail.com>> wrote:
>>>>> Hi, Konstantin
>>>>> 
>>>>>> . Would we support a temporal join with a changelog stream with
>>>>>> event time semantics by ignoring DELETE messages or would it be
>>>> completed
>>>>>> unsupported.
>>>>> 
>>>>> I don’t know the percentage of this feature in temporal scenarios.
>>>>> 
>>>>> Comparing to support the approximate event time join by ignoring
>>>> DELETE message or by extracting an approximate event time for DELET
>>>> message,  I’m not sure is this acceptable for user even if we have
>>>> explained the limitation of approximate event time join, I tend to do not
>>>> support this feature, because we can not ensure the semantic of event time
>>>> and it may lead an incorrect result for user in some scenarios.
>>>>> 
>>>>> If the percentage is highly enough and most user cases can accept the
>>>> approximate  event time, I'm ok to support it  for usability although it
>>>> doesn’t implements the event time semantic strictly.
>>>>> 
>>>>> Cheers,
>>>>> Leonard Xu
>>>>> 
>>>>> 
>>>> 
>>>> 
>>> 
>>> --
>>> 
>>> Best,
>>> Benchao Li
>>> 
>> 

Reply via email to