Hi Henry,

These are good questions!
I would rather not to add the temporal and lateral prefix in front of the
join. The temporal table is a concept orthogonal to join. We should say
join a temporal table or join a Lateral table.
1. You can of course use stream-stream join. Introducing the temporal table
not only makes our query more simple but also improves performance. More
detail can be found in [1].
2. Both two joins based on the concept of temporal table, i.e., a table
joins a temporal table.
3. Yes, actually the join in Flink uses a lateral
table&TemporalTableFunction to implement a temporal table. A temporal table
is a versioned table and a lateral table is a table keeps references to the
previous table. If you do not want to use time version, you don't need the
temporal table.
4. It is a kind of join. The join keyword can be omitted if it is an inner
join. The grammar will not be changed in the near future. I haven't heard
some news about changing it.
5. Yes, it will be optimized.
6. If you want to left join a temporal table. You can write sql like:

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  LEFT JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency

CC @Piotr Nowojski <pi...@data-artisans.com>  Would be great to have your
opinions here.

Best,
Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table


On Wed, Mar 13, 2019 at 1:59 PM 徐涛 <happydexu...@gmail.com> wrote:

> Hi Hequn,
> Thanks a lot for your answer! That is very helpful for me.
> I still have some questions about stream and dimension data join and
> temporal table join:
> 1. I found the temporal table join is still a one stream driven join, I do
> not know why the dimension data join has to be done by one stream driven
> join, why it can not be done by two stream join(traditional stream-stream
> join)?
> I try to give an answer about it: two stream join is based on the
> mechanism that is materialize two stream data in state, but the due to
> state retention, the dimension data may be lost. I guess this is one
> reason, am I correct?
> 2. Is Blink`s stream and dimension data join based on temporal table join?
>         3. I think lateral table join can also do dimension join if I do
> not want to use time versioning. How to choose between temporal table join
> and lateral table join?
> 4. I found that the temporal table join in Flink use a “LATERAL TABLE”
> grammar, but not “JOIN”, it is OK but not easier to use than “JOIN”, will
> the community modify the grammar in future releases?
> 5. In the following temporal table join statement, will the Orders table
> join Rates produce too many data before the where clause take effects? Will
> it be optimized?
>
> *SELECT*
> *  o.amount * r.rate AS amount*
> *FROM*
> *  Orders AS o,*
> *  LATERAL TABLE (Rates(o.rowtime)) AS r*
> *WHERE r.currency = o.currency *
>
> 6. How to use temporal table join to do left join?
>
>
> Best
> Henry
>
> 在 2019年3月13日,上午12:02,Hequn Cheng <chenghe...@gmail.com> 写道:
>
> Hi Henry,
>
> Yes, you are correct. Basically, there are two ways you can use to join a
> Temporal Table. One is provided in Flink and the other is provided in Blink
> which has been pushed as a branch[1] in Flink repo.
>
> - Join a Temporal Table in Flink[2][3][4]
> As the document said: it is a join with a temporal table joins an
> append-only table (left input/probe side) with a temporal table (right
> input/build side), i.e., a table that changes over time and tracks its
> changes. You need to define a temporal table function and it will be used
> to provide access to the state of a temporal table at a specific point in
> time. *Both rowtime and proctime are supported.*
> - Join a Temporal Table in Blink[5]
> Different from the join in Flink, it can join an *append/upsert/retract*
> stream (left input/probe side) with a temporal table (right input/build
> side), i.e., a *remote dimension table* that changes over time. In order to
> access data in a temporal table, you need to define a TableSource with
> LookupableTableSource[6](Probably you can download the code of blink and
> take a look at the `HBase143TableSource` which is an implementation of
> LookupableTableSource). Currently, only proctime is supported.
>
> I think you can choose one according to your scenarios.
> There are some useful examples in the document I list below. They may be
> very helpful for you. Feel free to ask if you have any other questions.
>
> Best,
> Hequn
>
> [1] https://github.com/apache/flink/tree/blink
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>
> [3]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html
> [4]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#joins
> [5]
> https://flink-china.org/doc/blink/dev/table/streaming/joins.html#join-with-a-temporal-table
> [6]
> https://flink-china.org/doc/blink/dev/table/sourcesinks#defining-a-tablesource-with-lookupable
>
> On Tue, Mar 12, 2019 at 2:13 PM 徐涛 <happydexu...@gmail.com> wrote:
>
>> Hi Hequn,
>> I want to implement stream join dimension in Flink SQL, I found there is
>> a new feature named Temporal Tables delivered by Flink1.7, I think it maybe
>> could be used to achieve the join between stream and dimension table. But I
>> am not sure about that. Could anyone help me about it?
>> Thanks a lot for your help.
>>
>> Best
>> Henry
>>
>> 在 2018年9月26日,上午12:16,Hequn Cheng <chenghe...@gmail.com> 写道:
>>
>> Hi vino,
>>
>> Thanks for sharing the link. It's a great book and I will take a look.
>> There are kinds of join. Different joins have different semantics. From
>> the link, I think it means the time versioned join.  FLINK-9712
>> <https://issues.apache.org/jira/browse/FLINK-9712> enrichments joins
>> with Time Versioned Functions and the result is deterministic under
>> eventime.
>>
>> Best, Hequn
>>
>> On Tue, Sep 25, 2018 at 11:05 PM vino yang <yanghua1...@gmail.com> wrote:
>>
>>> Hi Hequn,
>>>
>>> The specific content of the book does not give a right or wrong
>>> conclusion, but it illustrates this phenomenon: two streams of the same
>>> input, playing and joining at the same time, due to the order of events,
>>> the connection results are uncertain. This is because the two streams are
>>> intertwined in different forms. This has nothing to do with orderby, just
>>> that it exists in the stream stream join. Of course, this phenomenon is
>>> only a comparison statement with a non-stream join.
>>>
>>> In addition, I recommend this book, which is very famous on Twitter and
>>> Amazon. Because you are also Chinese, there is a good translation here. If
>>> I guess it is correct, the main translator is also from your company. This
>>> part of what I mentioned is here.[1]
>>>
>>> [1]:
>>> https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7
>>>
>>> Thanks, vino.
>>>
>>> Hequn Cheng <chenghe...@gmail.com> 于2018年9月25日周二 下午9:45写道:
>>>
>>>> Hi vino,
>>>>
>>>> There are no order problems of stream-stream join in Flink. No matter
>>>> what order the elements come, stream-stream join in Flink will output
>>>> results which consistent with standard SQL semantics. I haven't read the
>>>> book you mentioned. For join, it doesn't guarantee output orders. You have
>>>> to do orderBy if you want to get ordered results.
>>>>
>>>> Best, Hequn
>>>>
>>>> On Tue, Sep 25, 2018 at 8:36 PM vino yang <yanghua1...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> I may not have stated it here, and there is no semantic problem at the
>>>>> Flink implementation level. Rather, there may be “Time-dependence” here. 
>>>>> [1]
>>>>>
>>>>> Yes, my initial answer was not to use this form of join in this
>>>>> scenario, but Henry said he converted the table into a stream table and
>>>>> asked about the feasibility of other methods.
>>>>>
>>>>> [1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part
>>>>> 3: Derived Data, Chapter 11: Stream Processing , Stream Joins.
>>>>>
>>>>> some content :
>>>>>
>>>>> *If the ordering of events across streams is undetermined, the join
>>>>> becomes nondeter‐ ministic [87], which means you cannot rerun the same job
>>>>> on the same input and necessarily get the same result: the events on the
>>>>> input streams may be interleaved in a different way when you run the job
>>>>> again. *
>>>>>
>>>>>
>>>>> Fabian Hueske <fhue...@gmail.com> 于2018年9月25日周二 下午8:08写道:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I don't think that using the current join implementation in the Table
>>>>>> API / SQL will work.
>>>>>> The non-windowed join fully materializes *both* input tables in
>>>>>> state. This is necessary, because the join needs to be able to process
>>>>>> updates on either side.
>>>>>> While this is not a problem for the fixed sized MySQL table,
>>>>>> materializing the append-only table (aka stream) is probably not what you
>>>>>> want.
>>>>>> You can also not limit idle state retention because it would remove
>>>>>> the MySQL table from state at some point.
>>>>>>
>>>>>> The only way to make it work is using a user-defined TableFunction
>>>>>> that queries the MySQL table via JDBC.
>>>>>> However, please note that these calls would be synchronous, blocking
>>>>>> calls.
>>>>>>
>>>>>> @Vino: Why do you think that the stream & stream join is not mature
>>>>>> and which problems do you see in the semantics?
>>>>>> The semantics are correct (standard SQL semantics) and in my opinion
>>>>>> the implementation is also mature.
>>>>>> However, you should not use the non-windowed join if any of the input
>>>>>> tables is ever growing because both sides must be hold in state. This is
>>>>>> not an issue of the semantics.
>>>>>>
>>>>>> Cheers,
>>>>>> Fabian
>>>>>>
>>>>>> Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang <
>>>>>> yanghua1...@gmail.com>:
>>>>>>
>>>>>>> Hi Henry,
>>>>>>>
>>>>>>> 1) I don't recommend this method very much, but you said that you
>>>>>>> expect to convert mysql table to stream and then to flink table. Under 
>>>>>>> this
>>>>>>> premise, I said that you can do this by joining two stream tables. But 
>>>>>>> as
>>>>>>> you know, this join depends on the time period in which the state is 
>>>>>>> saved.
>>>>>>> To make it equivalent to a dimension table, you must permanently save 
>>>>>>> the
>>>>>>> state of the stream table that is defined as a "dimension table." I just
>>>>>>> said that modifying the relevant configuration in Flink can do this, Not
>>>>>>> for a single table.
>>>>>>>
>>>>>>> 2) Imagine that there are one million records in two tables. The
>>>>>>> records in both tables are just beginning to stream into flink, and the
>>>>>>> records as dimension tables are not fully arrived. Therefore, your 
>>>>>>> matching
>>>>>>> results may not be as accurate as directly querying Mysql.
>>>>>>>
>>>>>>> In fact, the current stream & stream join is not very mature, there
>>>>>>> are some problems in semantics, I personally recommend that you return 
>>>>>>> to
>>>>>>> stream/batch (mysql) join. For more principle content, I recommend you 
>>>>>>> read
>>>>>>> a book, referred to as 《DDIA》.
>>>>>>>
>>>>>>> Thanks, vino.
>>>>>>>
>>>>>>> 徐涛 <happydexu...@gmail.com> 于2018年9月25日周二 下午5:48写道:
>>>>>>>
>>>>>>>> Hi Vino,
>>>>>>>> I do not quite understand in some sentences below, would you please
>>>>>>>> help explain it a bit more detailedly?
>>>>>>>> 1. “*such as setting the state retention time of one of the tables
>>>>>>>> to be permanent*” , as I know, the state retention time is a
>>>>>>>> global config, I can not set this property per table.
>>>>>>>> 2. "*you may not be able to match the results, because the data
>>>>>>>> belonging to the mysql table is just beginning to play as a stream*”
>>>>>>>>  Why it is not able to match the results?
>>>>>>>>
>>>>>>>> Best
>>>>>>>> Henry
>>>>>>>>
>>>>>>>> 在 2018年9月25日,下午5:29,vino yang <yanghua1...@gmail.com> 写道:
>>>>>>>>
>>>>>>>> Hi Henry,
>>>>>>>>
>>>>>>>> If you have converted the mysql table to a flink stream table. In
>>>>>>>> flink table/sql, streams and stream joins can also do this, such as 
>>>>>>>> setting
>>>>>>>> the state retention time of one of the tables to be permanent. But 
>>>>>>>> when the
>>>>>>>> job is just running, you may not be able to match the results, because 
>>>>>>>> the
>>>>>>>> data belonging to the mysql table is just beginning to play as a 
>>>>>>>> stream.
>>>>>>>>
>>>>>>>> Thanks, vino.
>>>>>>>>
>>>>>>>> 徐涛 <happydexu...@gmail.com> 于2018年9月25日周二 下午5:10写道:
>>>>>>>>
>>>>>>>>> Hi Vino & Hequn,
>>>>>>>>> I am now using the table/sql API, if I import the mysql table as a
>>>>>>>>> stream then convert it into a table, it seems that it can also be a
>>>>>>>>> workaround for batch/streaming joining. May I ask what is the 
>>>>>>>>> difference
>>>>>>>>> between the UDTF method? Does this implementation has some defects?
>>>>>>>>> Best
>>>>>>>>> Henry
>>>>>>>>>
>>>>>>>>> 在 2018年9月22日,上午10:28,Hequn Cheng <chenghe...@gmail.com> 写道:
>>>>>>>>>
>>>>>>>>> Hi
>>>>>>>>>
>>>>>>>>> +1 for vino's answer.
>>>>>>>>> Also, this kind of join will be supported in FLINK-9712
>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-9712>. You can check
>>>>>>>>> more details in the jira.
>>>>>>>>>
>>>>>>>>> Best, Hequn
>>>>>>>>>
>>>>>>>>> On Fri, Sep 21, 2018 at 4:51 PM vino yang <yanghua1...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Henry,
>>>>>>>>>>
>>>>>>>>>> There are three ways I can think of:
>>>>>>>>>>
>>>>>>>>>> 1) use DataStream API, implement a flatmap UDF to access
>>>>>>>>>> dimension table;
>>>>>>>>>> 2) use table/sql API, implement a UDTF to access dimension table;
>>>>>>>>>> 3) customize the table/sql join API/statement's implementation
>>>>>>>>>> (and change the physical plan)
>>>>>>>>>>
>>>>>>>>>> Thanks, vino.
>>>>>>>>>>
>>>>>>>>>> 徐涛 <happydexu...@gmail.com> 于2018年9月21日周五 下午4:43写道:
>>>>>>>>>>
>>>>>>>>>>> Hi All,
>>>>>>>>>>>         Sometimes some “dimension table” need to be joined from
>>>>>>>>>>> the "fact table", if data are not joined before sent to Kafka.
>>>>>>>>>>>         So if the data are joined in Flink, does the “dimension
>>>>>>>>>>> table” have to be import as a stream, or there are some other ways 
>>>>>>>>>>> can
>>>>>>>>>>> achieve it?
>>>>>>>>>>>         Thanks a lot!
>>>>>>>>>>>
>>>>>>>>>>> Best
>>>>>>>>>>> Henry
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>
>

Reply via email to