Hi Henry, Yes, you are correct. Basically, there are two ways you can use to join a Temporal Table. One is provided in Flink and the other is provided in Blink which has been pushed as a branch[1] in Flink repo.
- Join a Temporal Table in Flink[2][3][4] As the document said: it is a join with a temporal table joins an append-only table (left input/probe side) with a temporal table (right input/build side), i.e., a table that changes over time and tracks its changes. You need to define a temporal table function and it will be used to provide access to the state of a temporal table at a specific point in time. *Both rowtime and proctime are supported.* - Join a Temporal Table in Blink[5] Different from the join in Flink, it can join an *append/upsert/retract* stream (left input/probe side) with a temporal table (right input/build side), i.e., a *remote dimension table* that changes over time. In order to access data in a temporal table, you need to define a TableSource with LookupableTableSource[6](Probably you can download the code of blink and take a look at the `HBase143TableSource` which is an implementation of LookupableTableSource). Currently, only proctime is supported. I think you can choose one according to your scenarios. There are some useful examples in the document I list below. They may be very helpful for you. Feel free to ask if you have any other questions. Best, Hequn [1] https://github.com/apache/flink/tree/blink [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table [3] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html [4] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#joins [5] https://flink-china.org/doc/blink/dev/table/streaming/joins.html#join-with-a-temporal-table [6] https://flink-china.org/doc/blink/dev/table/sourcesinks#defining-a-tablesource-with-lookupable On Tue, Mar 12, 2019 at 2:13 PM 徐涛 <happydexu...@gmail.com> wrote: > Hi Hequn, > I want to implement stream join dimension in Flink SQL, I found there is a > new feature named Temporal Tables delivered by Flink1.7, I think it maybe > could be used to achieve the join between stream and dimension table. But I > am not sure about that. Could anyone help me about it? > Thanks a lot for your help. > > Best > Henry > > 在 2018年9月26日,上午12:16,Hequn Cheng <chenghe...@gmail.com> 写道: > > Hi vino, > > Thanks for sharing the link. It's a great book and I will take a look. > There are kinds of join. Different joins have different semantics. From > the link, I think it means the time versioned join. FLINK-9712 > <https://issues.apache.org/jira/browse/FLINK-9712> enrichments joins with > Time Versioned Functions and the result is deterministic under eventime. > > Best, Hequn > > On Tue, Sep 25, 2018 at 11:05 PM vino yang <yanghua1...@gmail.com> wrote: > >> Hi Hequn, >> >> The specific content of the book does not give a right or wrong >> conclusion, but it illustrates this phenomenon: two streams of the same >> input, playing and joining at the same time, due to the order of events, >> the connection results are uncertain. This is because the two streams are >> intertwined in different forms. This has nothing to do with orderby, just >> that it exists in the stream stream join. Of course, this phenomenon is >> only a comparison statement with a non-stream join. >> >> In addition, I recommend this book, which is very famous on Twitter and >> Amazon. Because you are also Chinese, there is a good translation here. If >> I guess it is correct, the main translator is also from your company. This >> part of what I mentioned is here.[1] >> >> [1]: >> https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7 >> >> Thanks, vino. >> >> Hequn Cheng <chenghe...@gmail.com> 于2018年9月25日周二 下午9:45写道: >> >>> Hi vino, >>> >>> There are no order problems of stream-stream join in Flink. No matter >>> what order the elements come, stream-stream join in Flink will output >>> results which consistent with standard SQL semantics. I haven't read the >>> book you mentioned. For join, it doesn't guarantee output orders. You have >>> to do orderBy if you want to get ordered results. >>> >>> Best, Hequn >>> >>> On Tue, Sep 25, 2018 at 8:36 PM vino yang <yanghua1...@gmail.com> wrote: >>> >>>> Hi Fabian, >>>> >>>> I may not have stated it here, and there is no semantic problem at the >>>> Flink implementation level. Rather, there may be “Time-dependence” here. >>>> [1] >>>> >>>> Yes, my initial answer was not to use this form of join in this >>>> scenario, but Henry said he converted the table into a stream table and >>>> asked about the feasibility of other methods. >>>> >>>> [1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part >>>> 3: Derived Data, Chapter 11: Stream Processing , Stream Joins. >>>> >>>> some content : >>>> >>>> *If the ordering of events across streams is undetermined, the join >>>> becomes nondeter‐ ministic [87], which means you cannot rerun the same job >>>> on the same input and necessarily get the same result: the events on the >>>> input streams may be interleaved in a different way when you run the job >>>> again. * >>>> >>>> >>>> Fabian Hueske <fhue...@gmail.com> 于2018年9月25日周二 下午8:08写道: >>>> >>>>> Hi, >>>>> >>>>> I don't think that using the current join implementation in the Table >>>>> API / SQL will work. >>>>> The non-windowed join fully materializes *both* input tables in state. >>>>> This is necessary, because the join needs to be able to process updates on >>>>> either side. >>>>> While this is not a problem for the fixed sized MySQL table, >>>>> materializing the append-only table (aka stream) is probably not what you >>>>> want. >>>>> You can also not limit idle state retention because it would remove >>>>> the MySQL table from state at some point. >>>>> >>>>> The only way to make it work is using a user-defined TableFunction >>>>> that queries the MySQL table via JDBC. >>>>> However, please note that these calls would be synchronous, blocking >>>>> calls. >>>>> >>>>> @Vino: Why do you think that the stream & stream join is not mature >>>>> and which problems do you see in the semantics? >>>>> The semantics are correct (standard SQL semantics) and in my opinion >>>>> the implementation is also mature. >>>>> However, you should not use the non-windowed join if any of the input >>>>> tables is ever growing because both sides must be hold in state. This is >>>>> not an issue of the semantics. >>>>> >>>>> Cheers, >>>>> Fabian >>>>> >>>>> Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang < >>>>> yanghua1...@gmail.com>: >>>>> >>>>>> Hi Henry, >>>>>> >>>>>> 1) I don't recommend this method very much, but you said that you >>>>>> expect to convert mysql table to stream and then to flink table. Under >>>>>> this >>>>>> premise, I said that you can do this by joining two stream tables. But as >>>>>> you know, this join depends on the time period in which the state is >>>>>> saved. >>>>>> To make it equivalent to a dimension table, you must permanently save the >>>>>> state of the stream table that is defined as a "dimension table." I just >>>>>> said that modifying the relevant configuration in Flink can do this, Not >>>>>> for a single table. >>>>>> >>>>>> 2) Imagine that there are one million records in two tables. The >>>>>> records in both tables are just beginning to stream into flink, and the >>>>>> records as dimension tables are not fully arrived. Therefore, your >>>>>> matching >>>>>> results may not be as accurate as directly querying Mysql. >>>>>> >>>>>> In fact, the current stream & stream join is not very mature, there >>>>>> are some problems in semantics, I personally recommend that you return to >>>>>> stream/batch (mysql) join. For more principle content, I recommend you >>>>>> read >>>>>> a book, referred to as 《DDIA》. >>>>>> >>>>>> Thanks, vino. >>>>>> >>>>>> 徐涛 <happydexu...@gmail.com> 于2018年9月25日周二 下午5:48写道: >>>>>> >>>>>>> Hi Vino, >>>>>>> I do not quite understand in some sentences below, would you please >>>>>>> help explain it a bit more detailedly? >>>>>>> 1. “*such as setting the state retention time of one of the tables >>>>>>> to be permanent*” , as I know, the state retention time is a global >>>>>>> config, I can not set this property per table. >>>>>>> 2. "*you may not be able to match the results, because the data >>>>>>> belonging to the mysql table is just beginning to play as a stream*” >>>>>>> Why it is not able to match the results? >>>>>>> >>>>>>> Best >>>>>>> Henry >>>>>>> >>>>>>> 在 2018年9月25日,下午5:29,vino yang <yanghua1...@gmail.com> 写道: >>>>>>> >>>>>>> Hi Henry, >>>>>>> >>>>>>> If you have converted the mysql table to a flink stream table. In >>>>>>> flink table/sql, streams and stream joins can also do this, such as >>>>>>> setting >>>>>>> the state retention time of one of the tables to be permanent. But when >>>>>>> the >>>>>>> job is just running, you may not be able to match the results, because >>>>>>> the >>>>>>> data belonging to the mysql table is just beginning to play as a stream. >>>>>>> >>>>>>> Thanks, vino. >>>>>>> >>>>>>> 徐涛 <happydexu...@gmail.com> 于2018年9月25日周二 下午5:10写道: >>>>>>> >>>>>>>> Hi Vino & Hequn, >>>>>>>> I am now using the table/sql API, if I import the mysql table as a >>>>>>>> stream then convert it into a table, it seems that it can also be a >>>>>>>> workaround for batch/streaming joining. May I ask what is the >>>>>>>> difference >>>>>>>> between the UDTF method? Does this implementation has some defects? >>>>>>>> Best >>>>>>>> Henry >>>>>>>> >>>>>>>> 在 2018年9月22日,上午10:28,Hequn Cheng <chenghe...@gmail.com> 写道: >>>>>>>> >>>>>>>> Hi >>>>>>>> >>>>>>>> +1 for vino's answer. >>>>>>>> Also, this kind of join will be supported in FLINK-9712 >>>>>>>> <https://issues.apache.org/jira/browse/FLINK-9712>. You can check >>>>>>>> more details in the jira. >>>>>>>> >>>>>>>> Best, Hequn >>>>>>>> >>>>>>>> On Fri, Sep 21, 2018 at 4:51 PM vino yang <yanghua1...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Henry, >>>>>>>>> >>>>>>>>> There are three ways I can think of: >>>>>>>>> >>>>>>>>> 1) use DataStream API, implement a flatmap UDF to access dimension >>>>>>>>> table; >>>>>>>>> 2) use table/sql API, implement a UDTF to access dimension table; >>>>>>>>> 3) customize the table/sql join API/statement's implementation >>>>>>>>> (and change the physical plan) >>>>>>>>> >>>>>>>>> Thanks, vino. >>>>>>>>> >>>>>>>>> 徐涛 <happydexu...@gmail.com> 于2018年9月21日周五 下午4:43写道: >>>>>>>>> >>>>>>>>>> Hi All, >>>>>>>>>> Sometimes some “dimension table” need to be joined from >>>>>>>>>> the "fact table", if data are not joined before sent to Kafka. >>>>>>>>>> So if the data are joined in Flink, does the “dimension >>>>>>>>>> table” have to be import as a stream, or there are some other ways >>>>>>>>>> can >>>>>>>>>> achieve it? >>>>>>>>>> Thanks a lot! >>>>>>>>>> >>>>>>>>>> Best >>>>>>>>>> Henry >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >