Hi, all I open a new discussion of FLIP-132[1] which based on our consensus on current thread.
Let me keep communication in the new thread, please let me know if you have any concerns. Best Leonard [1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-132-Temporal-Table-DDL-td43483.html <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-132-Temporal-Table-DDL-td43483.html> > 在 2020年7月7日,00:31,Seth Wiesman <sjwies...@gmail.com> 写道: > > * I mistyped the rejected_query, it should be > > CREATE VIEW AS post_agg_stream SELECT currencyId, AVG(rate)* as *rate* > FROM *currency_rates > > CREATE VIEW AS rejected_query > SELECT > ...FROM > transactions AS t > JOIN post_agg_stream FOR SYSTEM_TIME AS OF t.transactionTime AS r > ON r.currency = t.currency > > > On Mon, Jul 6, 2020 at 11:29 AM Seth Wiesman <sjwies...@gmail.com> wrote: > >> Hey Leonard, >> >> Agreed, this is a fun discussion! >> >> (1) For support changelog source backed CDC tools, a problem is that can >>> we use the temporal table as a general source table which may followed by >>> some aggregation operations, more accurate is wether the aggregation >>> operator can use the DELETE record that we just updated the “correct” >>> operation time to retract a record, maybe not. This will pull us back to >>> the discussion of operation time VS event time, it’s a real cool but >>> complicated topic see above discussion from mine and @Jark’s. >>> >> >> I fully agree this is a complicated topic, however, I don't think its >> actually a problem that needs to be solved for the first version of this >> feature. My proposal is to disallow using upsert streams as temporal tables >> if an aggregation operation has been applied. Going back to my notion that >> temporal tables are a tool for performing streaming star schema >> denormalization, the dimension table in a star schema is rarely aggregated >> pre-join. In the case of a CDC stream of currency rates joined to >> transactions, the CDC stream only needs to support filter pushdowns and >> map-like transformations before being joined. I believe this is a >> reasonable limitation we can impose that will unblock a large percentage of >> use cases, and once we better understand the semantics of the correct >> operation in a retraction the limitation can be removed in future versions >> while remaining backward compatible. >> >> >> >> CREATE TABLE currency_rates ( >> currencyId BIGINT PRIMARY KEY, >> rate DECIMAL(10, 2)) WITH ( >> 'connector' = 'kafka', >> 'format' = 'debezium-json' >> ) >> *CREATE* TABLE transactions ( >> currencyId BIGINT, >> transactionTime TIMESTAMP(3)) WITH ( >> >> ) >> >> -- Uner my proposal this query would be supported because the currency_rates >> >> -- table is used in a temporal join without any aggregations having been >> applied >> >> CREATE VIEW AS working_query >> SELECT >> ...FROM >> transactions AS t >> JOIN currency_rates FOR SYSTEM_TIME AS OF t.transactionTime AS r >> ON r.currency = t.currencyId >> >> -- However, this query would be rejected by the planner until we determine >> the proper time semantics of a retacation >> >> CREATE VIEW AS post_agg_stream SELECT currencyId, AVG(rate)* as *rate* FROM >> *currency_rates >> >> CREATE VIEW AS rejected_query >> SELECT >> ...FROM >> transactions AS t >> JOIN currency_rates FOR SYSTEM_TIME AS OF t.transactionTime AS r >> ON r.currency = t.currency >> >> >> >> (2) For upsert source that defines PRIMARY KEY and may contains multiple >>> records under a PK, the main problem is the PK semantic,the multiple >>> records under a PK broke the unique semantic on a table. We need to walk >>> around this by (a) Adding another primary key keyword and explain the >>> upsert semantic (b) Creating temporal table base on a view that is the >>> deduplicated result of source table[2]. >>> >> >> This feels like more of a bikeshedding question than a blocker and I look >> forward to seeing what you come up with! >> >> Seth >> >> On Mon, Jul 6, 2020 at 10:59 AM Benchao Li <libenc...@apache.org> wrote: >> >>> Hi everyone, >>> >>> Thanks a lot for the great discussions so far. >>> >>> After reading through the long discussion, I still have one question. >>> Currently the temporal table function supports both event time and proc >>> time joining. >>> If we use "FOR SYSTEM_TIME AS OF" syntax without "TEMPORAL" keyword in >>> DDL, >>> does it mean we can only use temporal table function join with event time? >>> If we can, how do we distinguish it with current temporal table (also >>> known as dimension table)? >>> >>> Maybe I'm missing something here. Correct me if I'm wrong. >>> >>> Leonard Xu <xbjt...@gmail.com> 于2020年7月6日周一 下午11:34写道: >>> >>>> Hi, Seth >>>> >>>> Thanks for your explanation of user cases, and you’re wright the look up >>>> join/table is one kind of temporal table join/table which tracks latest >>>> snapshot of external DB-like tables, it's why we proposed use same >>>> temporal join syntax. >>>> >>>> In fact, I have invested and checked Debezuim format and Canal format >>>> more these days, and we can obtain the extract DML operation time from >>>> their meta information which comes from DB bin-log. Although extracting >>>> meta information from record is a part of FLIP-107 scope[1], at least we >>>> have a way to extract the correct operation time. Event we can obtain the >>>> expected operation time, there’re some problems. >>>> >>>> (1) For support changelog source backed CDC tools, a problem is that can >>>> we use the temporal table as a general source table which may followed by >>>> some aggregation operations, more accurate is wether the aggregation >>>> operator can use the DELETE record that we just updated the “correct” >>>> operation time to retract a record, maybe not. This will pull us back to >>>> the discussion of operation time VS event time, it’s a real cool but >>>> complicated topic see above discussion from mine and @Jark’s. >>>> >>>> (2) For upsert source that defines PRIMARY KEY and may contains multiple >>>> records under a PK, the main problem is the PK semantic,the multiple >>>> records under a PK broke the unique semantic on a table. We need to walk >>>> around this by (a) Adding another primary key keyword and explain the >>>> upsert semantic (b) Creating temporal table base on a view that is the >>>> deduplicated result of source table[2]. >>>> >>>> I’m working on (2), and if we want to support(1)i.e. support DELETE >>>> entirely, that’s really a big challenge but I also think wright thing for >>>> long term. >>>> >>>> If we decide to do (1), we need import operation time concept firstly, >>>> we need change the codebase for deal the operation time header in many >>>> places secondly, and finally explain and tell users how to understand and >>>> use temporal table. >>>> >>>> I’m a little worried about it’s valuable enough, I proposed only support >>>> (2) because it is a good replacement of current Temporal Table Function and >>>> will not introduce more concept and works. >>>> >>>> Jark, Jingsong, Konstantin, WDYT? >>>> >>>> >>>> Best, >>>> Leonard Xu >>>> [1] >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Accessread-onlymetadatae.g.partition >>>> < >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Accessread-onlymetadatae.g.partition >>>>> >>>> [2] >>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication >>>> < >>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication >>>>> >>>> >>>> >>>>> 在 2020年7月6日,22:02,Seth Wiesman <sjwies...@gmail.com> 写道: >>>>> >>>>> As an aside, I conceptually view temporal table joins to be >>>> semantically equivalent to look up table joins. They are just two different >>>> ways of consuming the same data. >>>>> >>>>> Seth >>>>> >>>>> On Mon, Jul 6, 2020 at 8:56 AM Seth Wiesman <sjwies...@gmail.com >>>> <mailto:sjwies...@gmail.com>> wrote: >>>>> Hi Leonard, >>>>> >>>>> Regarding DELETE operations I tend to have the opposite reaction. I >>>> spend a lot of time working with production Flink users across a large >>>> number of organizations and to say we don't support temporal tables that >>>> include DELETEs will be a blocker for adoption. Even organizations that >>>> claim to never delete rows still occasionally due so per GDPR requests or >>>> other regulations. >>>>> >>>>> I actually do think users will understand the limitations. Flink today >>>> has a very clear value proposition around correctness, your results are as >>>> correct as the input data provided. This does not change under support for >>>> DELETE records. Flink is providing the most correct results possible based >>>> on the resolution of the fields as generated by 3rd party systems. As >>>> Debezium and other CDC libraries become more accurate, so will Flink. >>>>> >>>>> Seth >>>>> >>>>> On Fri, Jul 3, 2020 at 11:00 PM Leonard Xu <xbjt...@gmail.com <mailto: >>>> xbjt...@gmail.com>> wrote: >>>>> Hi, Konstantin >>>>> >>>>>> . Would we support a temporal join with a changelog stream with >>>>>> event time semantics by ignoring DELETE messages or would it be >>>> completed >>>>>> unsupported. >>>>> >>>>> I don’t know the percentage of this feature in temporal scenarios. >>>>> >>>>> Comparing to support the approximate event time join by ignoring >>>> DELETE message or by extracting an approximate event time for DELET >>>> message, I’m not sure is this acceptable for user even if we have >>>> explained the limitation of approximate event time join, I tend to do not >>>> support this feature, because we can not ensure the semantic of event time >>>> and it may lead an incorrect result for user in some scenarios. >>>>> >>>>> If the percentage is highly enough and most user cases can accept the >>>> approximate event time, I'm ok to support it for usability although it >>>> doesn’t implements the event time semantic strictly. >>>>> >>>>> Cheers, >>>>> Leonard Xu >>>>> >>>>> >>>> >>>> >>> >>> -- >>> >>> Best, >>> Benchao Li >>> >>