Hi Henry,

Yes, you are correct. Basically, there are two ways you can use to join a
Temporal Table. One is provided in Flink and the other is provided in Blink
which has been pushed as a branch[1] in Flink repo.

- Join a Temporal Table in Flink[2][3][4]
As the document said: it is a join with a temporal table joins an
append-only table (left input/probe side) with a temporal table (right
input/build side), i.e., a table that changes over time and tracks its
changes. You need to define a temporal table function and it will be used
to provide access to the state of a temporal table at a specific point in
time. *Both rowtime and proctime are supported.*
- Join a Temporal Table in Blink[5]
Different from the join in Flink, it can join an *append/upsert/retract*
stream (left input/probe side) with a temporal table (right input/build
side), i.e., a *remote dimension table* that changes over time. In order to
access data in a temporal table, you need to define a TableSource with
LookupableTableSource[6](Probably you can download the code of blink and
take a look at the `HBase143TableSource` which is an implementation of
LookupableTableSource). Currently, only proctime is supported.

I think you can choose one according to your scenarios.
There are some useful examples in the document I list below. They may be
very helpful for you. Feel free to ask if you have any other questions.

Best,
Hequn

[1] https://github.com/apache/flink/tree/blink
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table

[3]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html
[4]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#joins
[5]
https://flink-china.org/doc/blink/dev/table/streaming/joins.html#join-with-a-temporal-table
[6]
https://flink-china.org/doc/blink/dev/table/sourcesinks#defining-a-tablesource-with-lookupable

On Tue, Mar 12, 2019 at 2:13 PM 徐涛 <happydexu...@gmail.com> wrote:

> Hi Hequn,
> I want to implement stream join dimension in Flink SQL, I found there is a
> new feature named Temporal Tables delivered by Flink1.7, I think it maybe
> could be used to achieve the join between stream and dimension table. But I
> am not sure about that. Could anyone help me about it?
> Thanks a lot for your help.
>
> Best
> Henry
>
> 在 2018年9月26日,上午12:16,Hequn Cheng <chenghe...@gmail.com> 写道:
>
> Hi vino,
>
> Thanks for sharing the link. It's a great book and I will take a look.
> There are kinds of join. Different joins have different semantics. From
> the link, I think it means the time versioned join.  FLINK-9712
> <https://issues.apache.org/jira/browse/FLINK-9712> enrichments joins with
> Time Versioned Functions and the result is deterministic under eventime.
>
> Best, Hequn
>
> On Tue, Sep 25, 2018 at 11:05 PM vino yang <yanghua1...@gmail.com> wrote:
>
>> Hi Hequn,
>>
>> The specific content of the book does not give a right or wrong
>> conclusion, but it illustrates this phenomenon: two streams of the same
>> input, playing and joining at the same time, due to the order of events,
>> the connection results are uncertain. This is because the two streams are
>> intertwined in different forms. This has nothing to do with orderby, just
>> that it exists in the stream stream join. Of course, this phenomenon is
>> only a comparison statement with a non-stream join.
>>
>> In addition, I recommend this book, which is very famous on Twitter and
>> Amazon. Because you are also Chinese, there is a good translation here. If
>> I guess it is correct, the main translator is also from your company. This
>> part of what I mentioned is here.[1]
>>
>> [1]:
>> https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7
>>
>> Thanks, vino.
>>
>> Hequn Cheng <chenghe...@gmail.com> 于2018年9月25日周二 下午9:45写道:
>>
>>> Hi vino,
>>>
>>> There are no order problems of stream-stream join in Flink. No matter
>>> what order the elements come, stream-stream join in Flink will output
>>> results which consistent with standard SQL semantics. I haven't read the
>>> book you mentioned. For join, it doesn't guarantee output orders. You have
>>> to do orderBy if you want to get ordered results.
>>>
>>> Best, Hequn
>>>
>>> On Tue, Sep 25, 2018 at 8:36 PM vino yang <yanghua1...@gmail.com> wrote:
>>>
>>>> Hi Fabian,
>>>>
>>>> I may not have stated it here, and there is no semantic problem at the
>>>> Flink implementation level. Rather, there may be “Time-dependence” here. 
>>>> [1]
>>>>
>>>> Yes, my initial answer was not to use this form of join in this
>>>> scenario, but Henry said he converted the table into a stream table and
>>>> asked about the feasibility of other methods.
>>>>
>>>> [1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part
>>>> 3: Derived Data, Chapter 11: Stream Processing , Stream Joins.
>>>>
>>>> some content :
>>>>
>>>> *If the ordering of events across streams is undetermined, the join
>>>> becomes nondeter‐ ministic [87], which means you cannot rerun the same job
>>>> on the same input and necessarily get the same result: the events on the
>>>> input streams may be interleaved in a different way when you run the job
>>>> again. *
>>>>
>>>>
>>>> Fabian Hueske <fhue...@gmail.com> 于2018年9月25日周二 下午8:08写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> I don't think that using the current join implementation in the Table
>>>>> API / SQL will work.
>>>>> The non-windowed join fully materializes *both* input tables in state.
>>>>> This is necessary, because the join needs to be able to process updates on
>>>>> either side.
>>>>> While this is not a problem for the fixed sized MySQL table,
>>>>> materializing the append-only table (aka stream) is probably not what you
>>>>> want.
>>>>> You can also not limit idle state retention because it would remove
>>>>> the MySQL table from state at some point.
>>>>>
>>>>> The only way to make it work is using a user-defined TableFunction
>>>>> that queries the MySQL table via JDBC.
>>>>> However, please note that these calls would be synchronous, blocking
>>>>> calls.
>>>>>
>>>>> @Vino: Why do you think that the stream & stream join is not mature
>>>>> and which problems do you see in the semantics?
>>>>> The semantics are correct (standard SQL semantics) and in my opinion
>>>>> the implementation is also mature.
>>>>> However, you should not use the non-windowed join if any of the input
>>>>> tables is ever growing because both sides must be hold in state. This is
>>>>> not an issue of the semantics.
>>>>>
>>>>> Cheers,
>>>>> Fabian
>>>>>
>>>>> Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang <
>>>>> yanghua1...@gmail.com>:
>>>>>
>>>>>> Hi Henry,
>>>>>>
>>>>>> 1) I don't recommend this method very much, but you said that you
>>>>>> expect to convert mysql table to stream and then to flink table. Under 
>>>>>> this
>>>>>> premise, I said that you can do this by joining two stream tables. But as
>>>>>> you know, this join depends on the time period in which the state is 
>>>>>> saved.
>>>>>> To make it equivalent to a dimension table, you must permanently save the
>>>>>> state of the stream table that is defined as a "dimension table." I just
>>>>>> said that modifying the relevant configuration in Flink can do this, Not
>>>>>> for a single table.
>>>>>>
>>>>>> 2) Imagine that there are one million records in two tables. The
>>>>>> records in both tables are just beginning to stream into flink, and the
>>>>>> records as dimension tables are not fully arrived. Therefore, your 
>>>>>> matching
>>>>>> results may not be as accurate as directly querying Mysql.
>>>>>>
>>>>>> In fact, the current stream & stream join is not very mature, there
>>>>>> are some problems in semantics, I personally recommend that you return to
>>>>>> stream/batch (mysql) join. For more principle content, I recommend you 
>>>>>> read
>>>>>> a book, referred to as 《DDIA》.
>>>>>>
>>>>>> Thanks, vino.
>>>>>>
>>>>>> 徐涛 <happydexu...@gmail.com> 于2018年9月25日周二 下午5:48写道:
>>>>>>
>>>>>>> Hi Vino,
>>>>>>> I do not quite understand in some sentences below, would you please
>>>>>>> help explain it a bit more detailedly?
>>>>>>> 1. “*such as setting the state retention time of one of the tables
>>>>>>> to be permanent*” , as I know, the state retention time is a global
>>>>>>> config, I can not set this property per table.
>>>>>>> 2. "*you may not be able to match the results, because the data
>>>>>>> belonging to the mysql table is just beginning to play as a stream*”
>>>>>>>  Why it is not able to match the results?
>>>>>>>
>>>>>>> Best
>>>>>>> Henry
>>>>>>>
>>>>>>> 在 2018年9月25日,下午5:29,vino yang <yanghua1...@gmail.com> 写道:
>>>>>>>
>>>>>>> Hi Henry,
>>>>>>>
>>>>>>> If you have converted the mysql table to a flink stream table. In
>>>>>>> flink table/sql, streams and stream joins can also do this, such as 
>>>>>>> setting
>>>>>>> the state retention time of one of the tables to be permanent. But when 
>>>>>>> the
>>>>>>> job is just running, you may not be able to match the results, because 
>>>>>>> the
>>>>>>> data belonging to the mysql table is just beginning to play as a stream.
>>>>>>>
>>>>>>> Thanks, vino.
>>>>>>>
>>>>>>> 徐涛 <happydexu...@gmail.com> 于2018年9月25日周二 下午5:10写道:
>>>>>>>
>>>>>>>> Hi Vino & Hequn,
>>>>>>>> I am now using the table/sql API, if I import the mysql table as a
>>>>>>>> stream then convert it into a table, it seems that it can also be a
>>>>>>>> workaround for batch/streaming joining. May I ask what is the 
>>>>>>>> difference
>>>>>>>> between the UDTF method? Does this implementation has some defects?
>>>>>>>> Best
>>>>>>>> Henry
>>>>>>>>
>>>>>>>> 在 2018年9月22日,上午10:28,Hequn Cheng <chenghe...@gmail.com> 写道:
>>>>>>>>
>>>>>>>> Hi
>>>>>>>>
>>>>>>>> +1 for vino's answer.
>>>>>>>> Also, this kind of join will be supported in FLINK-9712
>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-9712>. You can check
>>>>>>>> more details in the jira.
>>>>>>>>
>>>>>>>> Best, Hequn
>>>>>>>>
>>>>>>>> On Fri, Sep 21, 2018 at 4:51 PM vino yang <yanghua1...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Henry,
>>>>>>>>>
>>>>>>>>> There are three ways I can think of:
>>>>>>>>>
>>>>>>>>> 1) use DataStream API, implement a flatmap UDF to access dimension
>>>>>>>>> table;
>>>>>>>>> 2) use table/sql API, implement a UDTF to access dimension table;
>>>>>>>>> 3) customize the table/sql join API/statement's implementation
>>>>>>>>> (and change the physical plan)
>>>>>>>>>
>>>>>>>>> Thanks, vino.
>>>>>>>>>
>>>>>>>>> 徐涛 <happydexu...@gmail.com> 于2018年9月21日周五 下午4:43写道:
>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>         Sometimes some “dimension table” need to be joined from
>>>>>>>>>> the "fact table", if data are not joined before sent to Kafka.
>>>>>>>>>>         So if the data are joined in Flink, does the “dimension
>>>>>>>>>> table” have to be import as a stream, or there are some other ways 
>>>>>>>>>> can
>>>>>>>>>> achieve it?
>>>>>>>>>>         Thanks a lot!
>>>>>>>>>>
>>>>>>>>>> Best
>>>>>>>>>> Henry
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>

Reply via email to