Hi Henry, These are good questions! I would rather not to add the temporal and lateral prefix in front of the join. The temporal table is a concept orthogonal to join. We should say join a temporal table or join a Lateral table. 1. You can of course use stream-stream join. Introducing the temporal table not only makes our query more simple but also improves performance. More detail can be found in [1]. 2. Both two joins based on the concept of temporal table, i.e., a table joins a temporal table. 3. Yes, actually the join in Flink uses a lateral table&TemporalTableFunction to implement a temporal table. A temporal table is a versioned table and a lateral table is a table keeps references to the previous table. If you do not want to use time version, you don't need the temporal table. 4. It is a kind of join. The join keyword can be omitted if it is an inner join. The grammar will not be changed in the near future. I haven't heard some news about changing it. 5. Yes, it will be optimized. 6. If you want to left join a temporal table. You can write sql like:
SELECT o.amout, o.currency, r.rate, o.amount * r.rate FROM Orders AS o LEFT JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency CC @Piotr Nowojski <pi...@data-artisans.com> Would be great to have your opinions here. Best, Hequn [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table On Wed, Mar 13, 2019 at 1:59 PM 徐涛 <happydexu...@gmail.com> wrote: > Hi Hequn, > Thanks a lot for your answer! That is very helpful for me. > I still have some questions about stream and dimension data join and > temporal table join: > 1. I found the temporal table join is still a one stream driven join, I do > not know why the dimension data join has to be done by one stream driven > join, why it can not be done by two stream join(traditional stream-stream > join)? > I try to give an answer about it: two stream join is based on the > mechanism that is materialize two stream data in state, but the due to > state retention, the dimension data may be lost. I guess this is one > reason, am I correct? > 2. Is Blink`s stream and dimension data join based on temporal table join? > 3. I think lateral table join can also do dimension join if I do > not want to use time versioning. How to choose between temporal table join > and lateral table join? > 4. I found that the temporal table join in Flink use a “LATERAL TABLE” > grammar, but not “JOIN”, it is OK but not easier to use than “JOIN”, will > the community modify the grammar in future releases? > 5. In the following temporal table join statement, will the Orders table > join Rates produce too many data before the where clause take effects? Will > it be optimized? > > *SELECT* > * o.amount * r.rate AS amount* > *FROM* > * Orders AS o,* > * LATERAL TABLE (Rates(o.rowtime)) AS r* > *WHERE r.currency = o.currency * > > 6. How to use temporal table join to do left join? > > > Best > Henry > > 在 2019年3月13日,上午12:02,Hequn Cheng <chenghe...@gmail.com> 写道: > > Hi Henry, > > Yes, you are correct. Basically, there are two ways you can use to join a > Temporal Table. One is provided in Flink and the other is provided in Blink > which has been pushed as a branch[1] in Flink repo. > > - Join a Temporal Table in Flink[2][3][4] > As the document said: it is a join with a temporal table joins an > append-only table (left input/probe side) with a temporal table (right > input/build side), i.e., a table that changes over time and tracks its > changes. You need to define a temporal table function and it will be used > to provide access to the state of a temporal table at a specific point in > time. *Both rowtime and proctime are supported.* > - Join a Temporal Table in Blink[5] > Different from the join in Flink, it can join an *append/upsert/retract* > stream (left input/probe side) with a temporal table (right input/build > side), i.e., a *remote dimension table* that changes over time. In order to > access data in a temporal table, you need to define a TableSource with > LookupableTableSource[6](Probably you can download the code of blink and > take a look at the `HBase143TableSource` which is an implementation of > LookupableTableSource). Currently, only proctime is supported. > > I think you can choose one according to your scenarios. > There are some useful examples in the document I list below. They may be > very helpful for you. Feel free to ask if you have any other questions. > > Best, > Hequn > > [1] https://github.com/apache/flink/tree/blink > [2] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table > > [3] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html > [4] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#joins > [5] > https://flink-china.org/doc/blink/dev/table/streaming/joins.html#join-with-a-temporal-table > [6] > https://flink-china.org/doc/blink/dev/table/sourcesinks#defining-a-tablesource-with-lookupable > > On Tue, Mar 12, 2019 at 2:13 PM 徐涛 <happydexu...@gmail.com> wrote: > >> Hi Hequn, >> I want to implement stream join dimension in Flink SQL, I found there is >> a new feature named Temporal Tables delivered by Flink1.7, I think it maybe >> could be used to achieve the join between stream and dimension table. But I >> am not sure about that. Could anyone help me about it? >> Thanks a lot for your help. >> >> Best >> Henry >> >> 在 2018年9月26日,上午12:16,Hequn Cheng <chenghe...@gmail.com> 写道: >> >> Hi vino, >> >> Thanks for sharing the link. It's a great book and I will take a look. >> There are kinds of join. Different joins have different semantics. From >> the link, I think it means the time versioned join. FLINK-9712 >> <https://issues.apache.org/jira/browse/FLINK-9712> enrichments joins >> with Time Versioned Functions and the result is deterministic under >> eventime. >> >> Best, Hequn >> >> On Tue, Sep 25, 2018 at 11:05 PM vino yang <yanghua1...@gmail.com> wrote: >> >>> Hi Hequn, >>> >>> The specific content of the book does not give a right or wrong >>> conclusion, but it illustrates this phenomenon: two streams of the same >>> input, playing and joining at the same time, due to the order of events, >>> the connection results are uncertain. This is because the two streams are >>> intertwined in different forms. This has nothing to do with orderby, just >>> that it exists in the stream stream join. Of course, this phenomenon is >>> only a comparison statement with a non-stream join. >>> >>> In addition, I recommend this book, which is very famous on Twitter and >>> Amazon. Because you are also Chinese, there is a good translation here. If >>> I guess it is correct, the main translator is also from your company. This >>> part of what I mentioned is here.[1] >>> >>> [1]: >>> https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7 >>> >>> Thanks, vino. >>> >>> Hequn Cheng <chenghe...@gmail.com> 于2018年9月25日周二 下午9:45写道: >>> >>>> Hi vino, >>>> >>>> There are no order problems of stream-stream join in Flink. No matter >>>> what order the elements come, stream-stream join in Flink will output >>>> results which consistent with standard SQL semantics. I haven't read the >>>> book you mentioned. For join, it doesn't guarantee output orders. You have >>>> to do orderBy if you want to get ordered results. >>>> >>>> Best, Hequn >>>> >>>> On Tue, Sep 25, 2018 at 8:36 PM vino yang <yanghua1...@gmail.com> >>>> wrote: >>>> >>>>> Hi Fabian, >>>>> >>>>> I may not have stated it here, and there is no semantic problem at the >>>>> Flink implementation level. Rather, there may be “Time-dependence” here. >>>>> [1] >>>>> >>>>> Yes, my initial answer was not to use this form of join in this >>>>> scenario, but Henry said he converted the table into a stream table and >>>>> asked about the feasibility of other methods. >>>>> >>>>> [1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part >>>>> 3: Derived Data, Chapter 11: Stream Processing , Stream Joins. >>>>> >>>>> some content : >>>>> >>>>> *If the ordering of events across streams is undetermined, the join >>>>> becomes nondeter‐ ministic [87], which means you cannot rerun the same job >>>>> on the same input and necessarily get the same result: the events on the >>>>> input streams may be interleaved in a different way when you run the job >>>>> again. * >>>>> >>>>> >>>>> Fabian Hueske <fhue...@gmail.com> 于2018年9月25日周二 下午8:08写道: >>>>> >>>>>> Hi, >>>>>> >>>>>> I don't think that using the current join implementation in the Table >>>>>> API / SQL will work. >>>>>> The non-windowed join fully materializes *both* input tables in >>>>>> state. This is necessary, because the join needs to be able to process >>>>>> updates on either side. >>>>>> While this is not a problem for the fixed sized MySQL table, >>>>>> materializing the append-only table (aka stream) is probably not what you >>>>>> want. >>>>>> You can also not limit idle state retention because it would remove >>>>>> the MySQL table from state at some point. >>>>>> >>>>>> The only way to make it work is using a user-defined TableFunction >>>>>> that queries the MySQL table via JDBC. >>>>>> However, please note that these calls would be synchronous, blocking >>>>>> calls. >>>>>> >>>>>> @Vino: Why do you think that the stream & stream join is not mature >>>>>> and which problems do you see in the semantics? >>>>>> The semantics are correct (standard SQL semantics) and in my opinion >>>>>> the implementation is also mature. >>>>>> However, you should not use the non-windowed join if any of the input >>>>>> tables is ever growing because both sides must be hold in state. This is >>>>>> not an issue of the semantics. >>>>>> >>>>>> Cheers, >>>>>> Fabian >>>>>> >>>>>> Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang < >>>>>> yanghua1...@gmail.com>: >>>>>> >>>>>>> Hi Henry, >>>>>>> >>>>>>> 1) I don't recommend this method very much, but you said that you >>>>>>> expect to convert mysql table to stream and then to flink table. Under >>>>>>> this >>>>>>> premise, I said that you can do this by joining two stream tables. But >>>>>>> as >>>>>>> you know, this join depends on the time period in which the state is >>>>>>> saved. >>>>>>> To make it equivalent to a dimension table, you must permanently save >>>>>>> the >>>>>>> state of the stream table that is defined as a "dimension table." I just >>>>>>> said that modifying the relevant configuration in Flink can do this, Not >>>>>>> for a single table. >>>>>>> >>>>>>> 2) Imagine that there are one million records in two tables. The >>>>>>> records in both tables are just beginning to stream into flink, and the >>>>>>> records as dimension tables are not fully arrived. Therefore, your >>>>>>> matching >>>>>>> results may not be as accurate as directly querying Mysql. >>>>>>> >>>>>>> In fact, the current stream & stream join is not very mature, there >>>>>>> are some problems in semantics, I personally recommend that you return >>>>>>> to >>>>>>> stream/batch (mysql) join. For more principle content, I recommend you >>>>>>> read >>>>>>> a book, referred to as 《DDIA》. >>>>>>> >>>>>>> Thanks, vino. >>>>>>> >>>>>>> 徐涛 <happydexu...@gmail.com> 于2018年9月25日周二 下午5:48写道: >>>>>>> >>>>>>>> Hi Vino, >>>>>>>> I do not quite understand in some sentences below, would you please >>>>>>>> help explain it a bit more detailedly? >>>>>>>> 1. “*such as setting the state retention time of one of the tables >>>>>>>> to be permanent*” , as I know, the state retention time is a >>>>>>>> global config, I can not set this property per table. >>>>>>>> 2. "*you may not be able to match the results, because the data >>>>>>>> belonging to the mysql table is just beginning to play as a stream*” >>>>>>>> Why it is not able to match the results? >>>>>>>> >>>>>>>> Best >>>>>>>> Henry >>>>>>>> >>>>>>>> 在 2018年9月25日,下午5:29,vino yang <yanghua1...@gmail.com> 写道: >>>>>>>> >>>>>>>> Hi Henry, >>>>>>>> >>>>>>>> If you have converted the mysql table to a flink stream table. In >>>>>>>> flink table/sql, streams and stream joins can also do this, such as >>>>>>>> setting >>>>>>>> the state retention time of one of the tables to be permanent. But >>>>>>>> when the >>>>>>>> job is just running, you may not be able to match the results, because >>>>>>>> the >>>>>>>> data belonging to the mysql table is just beginning to play as a >>>>>>>> stream. >>>>>>>> >>>>>>>> Thanks, vino. >>>>>>>> >>>>>>>> 徐涛 <happydexu...@gmail.com> 于2018年9月25日周二 下午5:10写道: >>>>>>>> >>>>>>>>> Hi Vino & Hequn, >>>>>>>>> I am now using the table/sql API, if I import the mysql table as a >>>>>>>>> stream then convert it into a table, it seems that it can also be a >>>>>>>>> workaround for batch/streaming joining. May I ask what is the >>>>>>>>> difference >>>>>>>>> between the UDTF method? Does this implementation has some defects? >>>>>>>>> Best >>>>>>>>> Henry >>>>>>>>> >>>>>>>>> 在 2018年9月22日,上午10:28,Hequn Cheng <chenghe...@gmail.com> 写道: >>>>>>>>> >>>>>>>>> Hi >>>>>>>>> >>>>>>>>> +1 for vino's answer. >>>>>>>>> Also, this kind of join will be supported in FLINK-9712 >>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-9712>. You can check >>>>>>>>> more details in the jira. >>>>>>>>> >>>>>>>>> Best, Hequn >>>>>>>>> >>>>>>>>> On Fri, Sep 21, 2018 at 4:51 PM vino yang <yanghua1...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Henry, >>>>>>>>>> >>>>>>>>>> There are three ways I can think of: >>>>>>>>>> >>>>>>>>>> 1) use DataStream API, implement a flatmap UDF to access >>>>>>>>>> dimension table; >>>>>>>>>> 2) use table/sql API, implement a UDTF to access dimension table; >>>>>>>>>> 3) customize the table/sql join API/statement's implementation >>>>>>>>>> (and change the physical plan) >>>>>>>>>> >>>>>>>>>> Thanks, vino. >>>>>>>>>> >>>>>>>>>> 徐涛 <happydexu...@gmail.com> 于2018年9月21日周五 下午4:43写道: >>>>>>>>>> >>>>>>>>>>> Hi All, >>>>>>>>>>> Sometimes some “dimension table” need to be joined from >>>>>>>>>>> the "fact table", if data are not joined before sent to Kafka. >>>>>>>>>>> So if the data are joined in Flink, does the “dimension >>>>>>>>>>> table” have to be import as a stream, or there are some other ways >>>>>>>>>>> can >>>>>>>>>>> achieve it? >>>>>>>>>>> Thanks a lot! >>>>>>>>>>> >>>>>>>>>>> Best >>>>>>>>>>> Henry >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >> >