Hi Henry, 1. Also take a look at the regular joins limitations <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#regular-joins>:
> However, this operation has an important implication: it requires to keep > both sides of the join input in Flink’s state forever. Thus, the resource > usage will grow > indefinitely as well, if one or both input tables are continuously growing. 4. Our current grammar for temporal table joins is like a stop gap solution that is ANSI SQL complainant. Unfortunately SQL standard lags behind the streaming requirements and we are working on addressing this issue. [1] 5. It will be execute in a similar fashion how you would expect regular hash join to be executed - the “WHERE” join condition will be pushed into the temporal table join operator. 6. I don’t think that Flink supports the syntax suggested by Hequn. Currently outer joins are not supported with temporal tables. Piotrek [1] https://issues.apache.org/jira/browse/CALCITE-1917 <https://issues.apache.org/jira/browse/CALCITE-1917> > On 14 Mar 2019, at 03:31, Hequn Cheng <chenghe...@gmail.com> wrote: > > Hi Henry, > > These are good questions! > I would rather not to add the temporal and lateral prefix in front of the > join. The temporal table is a concept orthogonal to join. We should say join > a temporal table or join a Lateral table. > 1. You can of course use stream-stream join. Introducing the temporal table > not only makes our query more simple but also improves performance. More > detail can be found in [1]. > 2. Both two joins based on the concept of temporal table, i.e., a table joins > a temporal table. > 3. Yes, actually the join in Flink uses a lateral table&TemporalTableFunction > to implement a temporal table. A temporal table is a versioned table and a > lateral table is a table keeps references to the previous table. If you do > not want to use time version, you don't need the temporal table. > 4. It is a kind of join. The join keyword can be omitted if it is an inner > join. The grammar will not be changed in the near future. I haven't heard > some news about changing it. > 5. Yes, it will be optimized. > 6. If you want to left join a temporal table. You can write sql like: > > SELECT > o.amout, o.currency, r.rate, o.amount * r.rate > FROM > Orders AS o > LEFT JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r > ON r.currency = o.currency > > CC @Piotr Nowojski <mailto:pi...@data-artisans.com> Would be great to have > your opinions here. > > Best, > Hequn > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table > > <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table> > > > On Wed, Mar 13, 2019 at 1:59 PM 徐涛 <happydexu...@gmail.com > <mailto:happydexu...@gmail.com>> wrote: > Hi Hequn, > Thanks a lot for your answer! That is very helpful for me. > I still have some questions about stream and dimension data join and > temporal table join: > 1. I found the temporal table join is still a one stream driven join, I > do not know why the dimension data join has to be done by one stream driven > join, why it can not be done by two stream join(traditional stream-stream > join)? > I try to give an answer about it: two stream join is based on > the mechanism that is materialize two stream data in state, but the due to > state retention, the dimension data may be lost. I guess this is one reason, > am I correct? > 2. Is Blink`s stream and dimension data join based on temporal table > join? > 3. I think lateral table join can also do dimension join if I do not > want to use time versioning. How to choose between temporal table join and > lateral table join? > 4. I found that the temporal table join in Flink use a “LATERAL TABLE” > grammar, but not “JOIN”, it is OK but not easier to use than “JOIN”, will the > community modify the grammar in future releases? > 5. In the following temporal table join statement, will the Orders > table join Rates produce too many data before the where clause take effects? > Will it be optimized? > SELECT > o.amount * r.rate AS amount > FROM > Orders AS o, > LATERAL TABLE (Rates(o.rowtime)) AS r > WHERE r.currency = o.currency > 6. How to use temporal table join to do left join? > > > Best > Henry > >> 在 2019年3月13日,上午12:02,Hequn Cheng <chenghe...@gmail.com >> <mailto:chenghe...@gmail.com>> 写道: >> >> Hi Henry, >> >> Yes, you are correct. Basically, there are two ways you can use to join a >> Temporal Table. One is provided in Flink and the other is provided in Blink >> which has been pushed as a branch[1] in Flink repo. >> >> - Join a Temporal Table in Flink[2][3][4] >> As the document said: it is a join with a temporal table joins an >> append-only table (left input/probe side) with a temporal table (right >> input/build side), i.e., a table that changes over time and tracks its >> changes. You need to define a temporal table function and it will be used to >> provide access to the state of a temporal table at a specific point in time. >> *Both rowtime and proctime are supported.* >> - Join a Temporal Table in Blink[5] >> Different from the join in Flink, it can join an *append/upsert/retract* >> stream (left input/probe side) with a temporal table (right input/build >> side), i.e., a *remote dimension table* that changes over time. In order to >> access data in a temporal table, you need to define a TableSource with >> LookupableTableSource[6](Probably you can download the code of blink and >> take a look at the `HBase143TableSource` which is an implementation of >> LookupableTableSource). Currently, only proctime is supported. >> >> I think you can choose one according to your scenarios. >> There are some useful examples in the document I list below. They may be >> very helpful for you. Feel free to ask if you have any other questions. >> >> Best, >> Hequn >> >> [1] https://github.com/apache/flink/tree/blink >> <https://github.com/apache/flink/tree/blink> >> [2] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table >> >> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table> >> >> [3] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html >> >> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html> >> [4] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#joins >> >> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#joins> >> [5] >> https://flink-china.org/doc/blink/dev/table/streaming/joins.html#join-with-a-temporal-table >> >> <https://flink-china.org/doc/blink/dev/table/streaming/joins.html#join-with-a-temporal-table> >> [6] >> https://flink-china.org/doc/blink/dev/table/sourcesinks#defining-a-tablesource-with-lookupable >> >> <https://flink-china.org/doc/blink/dev/table/sourcesinks#defining-a-tablesource-with-lookupable> >> On Tue, Mar 12, 2019 at 2:13 PM 徐涛 <happydexu...@gmail.com >> <mailto:happydexu...@gmail.com>> wrote: >> Hi Hequn, >> I want to implement stream join dimension in Flink SQL, I found there >> is a new feature named Temporal Tables delivered by Flink1.7, I think it >> maybe could be used to achieve the join between stream and dimension table. >> But I am not sure about that. Could anyone help me about it? >> Thanks a lot for your help. >> >> Best >> Henry >> >>> 在 2018年9月26日,上午12:16,Hequn Cheng <chenghe...@gmail.com >>> <mailto:chenghe...@gmail.com>> 写道: >>> >>> Hi vino, >>> >>> Thanks for sharing the link. It's a great book and I will take a look. >>> There are kinds of join. Different joins have different semantics. From the >>> link, I think it means the time versioned join. FLINK-9712 >>> <https://issues.apache.org/jira/browse/FLINK-9712> enrichments joins with >>> Time Versioned Functions and the result is deterministic under eventime. >>> >>> Best, Hequn >>> >>> On Tue, Sep 25, 2018 at 11:05 PM vino yang <yanghua1...@gmail.com >>> <mailto:yanghua1...@gmail.com>> wrote: >>> Hi Hequn, >>> >>> The specific content of the book does not give a right or wrong conclusion, >>> but it illustrates this phenomenon: two streams of the same input, playing >>> and joining at the same time, due to the order of events, the connection >>> results are uncertain. This is because the two streams are intertwined in >>> different forms. This has nothing to do with orderby, just that it exists >>> in the stream stream join. Of course, this phenomenon is only a comparison >>> statement with a non-stream join. >>> >>> In addition, I recommend this book, which is very famous on Twitter and >>> Amazon. Because you are also Chinese, there is a good translation here. If >>> I guess it is correct, the main translator is also from your company. This >>> part of what I mentioned is here.[1] >>> >>> [1]: >>> https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7 >>> >>> <https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7> >>> >>> Thanks, vino. >>> >>> Hequn Cheng <chenghe...@gmail.com <mailto:chenghe...@gmail.com>> >>> 于2018年9月25日周二 下午9:45写道: >>> Hi vino, >>> >>> There are no order problems of stream-stream join in Flink. No matter what >>> order the elements come, stream-stream join in Flink will output results >>> which consistent with standard SQL semantics. I haven't read the book you >>> mentioned. For join, it doesn't guarantee output orders. You have to do >>> orderBy if you want to get ordered results. >>> >>> Best, Hequn >>> >>> On Tue, Sep 25, 2018 at 8:36 PM vino yang <yanghua1...@gmail.com >>> <mailto:yanghua1...@gmail.com>> wrote: >>> Hi Fabian, >>> >>> I may not have stated it here, and there is no semantic problem at the >>> Flink implementation level. Rather, there may be “Time-dependence” here. [1] >>> >>> Yes, my initial answer was not to use this form of join in this scenario, >>> but Henry said he converted the table into a stream table and asked about >>> the feasibility of other methods. >>> >>> [1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part 3: >>> Derived Data, Chapter 11: Stream Processing , Stream Joins. >>> >>> some content : >>> If the ordering of events across streams is undetermined, the join becomes >>> nondeter‐ ministic [87], which means you cannot rerun the same job on the >>> same input and necessarily get the same result: the events on the input >>> streams may be interleaved in a different way when you run the job again. >>> >>> >>> >>> Fabian Hueske <fhue...@gmail.com <mailto:fhue...@gmail.com>> 于2018年9月25日周二 >>> 下午8:08写道: >>> Hi, >>> >>> I don't think that using the current join implementation in the Table API / >>> SQL will work. >>> The non-windowed join fully materializes *both* input tables in state. This >>> is necessary, because the join needs to be able to process updates on >>> either side. >>> While this is not a problem for the fixed sized MySQL table, materializing >>> the append-only table (aka stream) is probably not what you want. >>> You can also not limit idle state retention because it would remove the >>> MySQL table from state at some point. >>> >>> The only way to make it work is using a user-defined TableFunction that >>> queries the MySQL table via JDBC. >>> However, please note that these calls would be synchronous, blocking calls. >>> >>> @Vino: Why do you think that the stream & stream join is not mature and >>> which problems do you see in the semantics? >>> The semantics are correct (standard SQL semantics) and in my opinion the >>> implementation is also mature. >>> However, you should not use the non-windowed join if any of the input >>> tables is ever growing because both sides must be hold in state. This is >>> not an issue of the semantics. >>> >>> Cheers, >>> Fabian >>> >>> Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang <yanghua1...@gmail.com >>> <mailto:yanghua1...@gmail.com>>: >>> Hi Henry, >>> >>> 1) I don't recommend this method very much, but you said that you expect to >>> convert mysql table to stream and then to flink table. Under this premise, >>> I said that you can do this by joining two stream tables. But as you know, >>> this join depends on the time period in which the state is saved. To make >>> it equivalent to a dimension table, you must permanently save the state of >>> the stream table that is defined as a "dimension table." I just said that >>> modifying the relevant configuration in Flink can do this, Not for a single >>> table. >>> >>> 2) Imagine that there are one million records in two tables. The records in >>> both tables are just beginning to stream into flink, and the records as >>> dimension tables are not fully arrived. Therefore, your matching results >>> may not be as accurate as directly querying Mysql. >>> >>> In fact, the current stream & stream join is not very mature, there are >>> some problems in semantics, I personally recommend that you return to >>> stream/batch (mysql) join. For more principle content, I recommend you read >>> a book, referred to as 《DDIA》. >>> >>> Thanks, vino. >>> >>> 徐涛 <happydexu...@gmail.com <mailto:happydexu...@gmail.com>> 于2018年9月25日周二 >>> 下午5:48写道: >>> Hi Vino, >>> I do not quite understand in some sentences below, would you please >>> help explain it a bit more detailedly? >>> 1. “such as setting the state retention time of one of the tables to be >>> permanent” , as I know, the state retention time is a global config, I can >>> not set this property per table. >>> 2. "you may not be able to match the results, because the data >>> belonging to the mysql table is just beginning to play as a stream” Why it >>> is not able to match the results? >>> >>> Best >>> Henry >>> >>>> 在 2018年9月25日,下午5:29,vino yang <yanghua1...@gmail.com >>>> <mailto:yanghua1...@gmail.com>> 写道: >>>> >>>> Hi Henry, >>>> >>>> If you have converted the mysql table to a flink stream table. In flink >>>> table/sql, streams and stream joins can also do this, such as setting the >>>> state retention time of one of the tables to be permanent. But when the >>>> job is just running, you may not be able to match the results, because the >>>> data belonging to the mysql table is just beginning to play as a stream. >>>> >>>> Thanks, vino. >>>> >>>> 徐涛 <happydexu...@gmail.com <mailto:happydexu...@gmail.com>> 于2018年9月25日周二 >>>> 下午5:10写道: >>>> Hi Vino & Hequn, >>>> I am now using the table/sql API, if I import the mysql table as a >>>> stream then convert it into a table, it seems that it can also be a >>>> workaround for batch/streaming joining. May I ask what is the difference >>>> between the UDTF method? Does this implementation has some defects? >>>> >>>> Best >>>> Henry >>>> >>>>> 在 2018年9月22日,上午10:28,Hequn Cheng <chenghe...@gmail.com >>>>> <mailto:chenghe...@gmail.com>> 写道: >>>>> >>>>> Hi >>>>> >>>>> +1 for vino's answer. >>>>> Also, this kind of join will be supported in FLINK-9712 >>>>> <https://issues.apache.org/jira/browse/FLINK-9712>. You can check more >>>>> details in the jira. >>>>> >>>>> Best, Hequn >>>>> >>>>> On Fri, Sep 21, 2018 at 4:51 PM vino yang <yanghua1...@gmail.com >>>>> <mailto:yanghua1...@gmail.com>> wrote: >>>>> Hi Henry, >>>>> >>>>> There are three ways I can think of: >>>>> >>>>> 1) use DataStream API, implement a flatmap UDF to access dimension table; >>>>> 2) use table/sql API, implement a UDTF to access dimension table; >>>>> 3) customize the table/sql join API/statement's implementation (and >>>>> change the physical plan) >>>>> >>>>> Thanks, vino. >>>>> >>>>> 徐涛 <happydexu...@gmail.com <mailto:happydexu...@gmail.com>> 于2018年9月21日周五 >>>>> 下午4:43写道: >>>>> Hi All, >>>>> Sometimes some “dimension table” need to be joined from the "fact >>>>> table", if data are not joined before sent to Kafka. >>>>> So if the data are joined in Flink, does the “dimension table” >>>>> have to be import as a stream, or there are some other ways can achieve >>>>> it? >>>>> Thanks a lot! >>>>> >>>>> Best >>>>> Henry >>>> >>> >> >