* I mistyped the rejected_query, it should be CREATE VIEW AS post_agg_stream SELECT currencyId, AVG(rate)* as *rate* FROM *currency_rates
CREATE VIEW AS rejected_query SELECT ...FROM transactions AS t JOIN post_agg_stream FOR SYSTEM_TIME AS OF t.transactionTime AS r ON r.currency = t.currency On Mon, Jul 6, 2020 at 11:29 AM Seth Wiesman <[email protected]> wrote: > Hey Leonard, > > Agreed, this is a fun discussion! > > (1) For support changelog source backed CDC tools, a problem is that can >> we use the temporal table as a general source table which may followed by >> some aggregation operations, more accurate is wether the aggregation >> operator can use the DELETE record that we just updated the “correct” >> operation time to retract a record, maybe not. This will pull us back to >> the discussion of operation time VS event time, it’s a real cool but >> complicated topic see above discussion from mine and @Jark’s. >> > > I fully agree this is a complicated topic, however, I don't think its > actually a problem that needs to be solved for the first version of this > feature. My proposal is to disallow using upsert streams as temporal tables > if an aggregation operation has been applied. Going back to my notion that > temporal tables are a tool for performing streaming star schema > denormalization, the dimension table in a star schema is rarely aggregated > pre-join. In the case of a CDC stream of currency rates joined to > transactions, the CDC stream only needs to support filter pushdowns and > map-like transformations before being joined. I believe this is a > reasonable limitation we can impose that will unblock a large percentage of > use cases, and once we better understand the semantics of the correct > operation in a retraction the limitation can be removed in future versions > while remaining backward compatible. > > > > CREATE TABLE currency_rates ( > currencyId BIGINT PRIMARY KEY, > rate DECIMAL(10, 2)) WITH ( > 'connector' = 'kafka', > 'format' = 'debezium-json' > ) > *CREATE* TABLE transactions ( > currencyId BIGINT, > transactionTime TIMESTAMP(3)) WITH ( > > ) > > -- Uner my proposal this query would be supported because the currency_rates > > -- table is used in a temporal join without any aggregations having been > applied > > CREATE VIEW AS working_query > SELECT > ...FROM > transactions AS t > JOIN currency_rates FOR SYSTEM_TIME AS OF t.transactionTime AS r > ON r.currency = t.currencyId > > -- However, this query would be rejected by the planner until we determine > the proper time semantics of a retacation > > CREATE VIEW AS post_agg_stream SELECT currencyId, AVG(rate)* as *rate* FROM > *currency_rates > > CREATE VIEW AS rejected_query > SELECT > ...FROM > transactions AS t > JOIN currency_rates FOR SYSTEM_TIME AS OF t.transactionTime AS r > ON r.currency = t.currency > > > > (2) For upsert source that defines PRIMARY KEY and may contains multiple >> records under a PK, the main problem is the PK semantic,the multiple >> records under a PK broke the unique semantic on a table. We need to walk >> around this by (a) Adding another primary key keyword and explain the >> upsert semantic (b) Creating temporal table base on a view that is the >> deduplicated result of source table[2]. >> > > This feels like more of a bikeshedding question than a blocker and I look > forward to seeing what you come up with! > > Seth > > On Mon, Jul 6, 2020 at 10:59 AM Benchao Li <[email protected]> wrote: > >> Hi everyone, >> >> Thanks a lot for the great discussions so far. >> >> After reading through the long discussion, I still have one question. >> Currently the temporal table function supports both event time and proc >> time joining. >> If we use "FOR SYSTEM_TIME AS OF" syntax without "TEMPORAL" keyword in >> DDL, >> does it mean we can only use temporal table function join with event time? >> If we can, how do we distinguish it with current temporal table (also >> known as dimension table)? >> >> Maybe I'm missing something here. Correct me if I'm wrong. >> >> Leonard Xu <[email protected]> 于2020年7月6日周一 下午11:34写道: >> >>> Hi, Seth >>> >>> Thanks for your explanation of user cases, and you’re wright the look up >>> join/table is one kind of temporal table join/table which tracks latest >>> snapshot of external DB-like tables, it's why we proposed use same >>> temporal join syntax. >>> >>> In fact, I have invested and checked Debezuim format and Canal format >>> more these days, and we can obtain the extract DML operation time from >>> their meta information which comes from DB bin-log. Although extracting >>> meta information from record is a part of FLIP-107 scope[1], at least we >>> have a way to extract the correct operation time. Event we can obtain the >>> expected operation time, there’re some problems. >>> >>> (1) For support changelog source backed CDC tools, a problem is that can >>> we use the temporal table as a general source table which may followed by >>> some aggregation operations, more accurate is wether the aggregation >>> operator can use the DELETE record that we just updated the “correct” >>> operation time to retract a record, maybe not. This will pull us back to >>> the discussion of operation time VS event time, it’s a real cool but >>> complicated topic see above discussion from mine and @Jark’s. >>> >>> (2) For upsert source that defines PRIMARY KEY and may contains multiple >>> records under a PK, the main problem is the PK semantic,the multiple >>> records under a PK broke the unique semantic on a table. We need to walk >>> around this by (a) Adding another primary key keyword and explain the >>> upsert semantic (b) Creating temporal table base on a view that is the >>> deduplicated result of source table[2]. >>> >>> I’m working on (2), and if we want to support(1)i.e. support DELETE >>> entirely, that’s really a big challenge but I also think wright thing for >>> long term. >>> >>> If we decide to do (1), we need import operation time concept firstly, >>> we need change the codebase for deal the operation time header in many >>> places secondly, and finally explain and tell users how to understand and >>> use temporal table. >>> >>> I’m a little worried about it’s valuable enough, I proposed only support >>> (2) because it is a good replacement of current Temporal Table Function and >>> will not introduce more concept and works. >>> >>> Jark, Jingsong, Konstantin, WDYT? >>> >>> >>> Best, >>> Leonard Xu >>> [1] >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Accessread-onlymetadatae.g.partition >>> < >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Accessread-onlymetadatae.g.partition >>> > >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication >>> < >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication >>> > >>> >>> >>> > 在 2020年7月6日,22:02,Seth Wiesman <[email protected]> 写道: >>> > >>> > As an aside, I conceptually view temporal table joins to be >>> semantically equivalent to look up table joins. They are just two different >>> ways of consuming the same data. >>> > >>> > Seth >>> > >>> > On Mon, Jul 6, 2020 at 8:56 AM Seth Wiesman <[email protected] >>> <mailto:[email protected]>> wrote: >>> > Hi Leonard, >>> > >>> > Regarding DELETE operations I tend to have the opposite reaction. I >>> spend a lot of time working with production Flink users across a large >>> number of organizations and to say we don't support temporal tables that >>> include DELETEs will be a blocker for adoption. Even organizations that >>> claim to never delete rows still occasionally due so per GDPR requests or >>> other regulations. >>> > >>> > I actually do think users will understand the limitations. Flink today >>> has a very clear value proposition around correctness, your results are as >>> correct as the input data provided. This does not change under support for >>> DELETE records. Flink is providing the most correct results possible based >>> on the resolution of the fields as generated by 3rd party systems. As >>> Debezium and other CDC libraries become more accurate, so will Flink. >>> > >>> > Seth >>> > >>> > On Fri, Jul 3, 2020 at 11:00 PM Leonard Xu <[email protected] <mailto: >>> [email protected]>> wrote: >>> > Hi, Konstantin >>> > >>> >> . Would we support a temporal join with a changelog stream with >>> >> event time semantics by ignoring DELETE messages or would it be >>> completed >>> >> unsupported. >>> > >>> > I don’t know the percentage of this feature in temporal scenarios. >>> > >>> > Comparing to support the approximate event time join by ignoring >>> DELETE message or by extracting an approximate event time for DELET >>> message, I’m not sure is this acceptable for user even if we have >>> explained the limitation of approximate event time join, I tend to do not >>> support this feature, because we can not ensure the semantic of event time >>> and it may lead an incorrect result for user in some scenarios. >>> > >>> > If the percentage is highly enough and most user cases can accept the >>> approximate event time, I'm ok to support it for usability although it >>> doesn’t implements the event time semantic strictly. >>> > >>> > Cheers, >>> > Leonard Xu >>> > >>> > >>> >>> >> >> -- >> >> Best, >> Benchao Li >> >
