Hi vino,

Thanks for sharing the link. It's a great book and I will take a look.
There are kinds of join. Different joins have different semantics. From the
link, I think it means the time versioned join.  FLINK-9712
<https://issues.apache.org/jira/browse/FLINK-9712> enrichments joins with
Time Versioned Functions and the result is deterministic under eventime.

Best, Hequn

On Tue, Sep 25, 2018 at 11:05 PM vino yang <yanghua1...@gmail.com> wrote:

> Hi Hequn,
>
> The specific content of the book does not give a right or wrong
> conclusion, but it illustrates this phenomenon: two streams of the same
> input, playing and joining at the same time, due to the order of events,
> the connection results are uncertain. This is because the two streams are
> intertwined in different forms. This has nothing to do with orderby, just
> that it exists in the stream stream join. Of course, this phenomenon is
> only a comparison statement with a non-stream join.
>
> In addition, I recommend this book, which is very famous on Twitter and
> Amazon. Because you are also Chinese, there is a good translation here. If
> I guess it is correct, the main translator is also from your company. This
> part of what I mentioned is here.[1]
>
> [1]:
> https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7
>
> Thanks, vino.
>
> Hequn Cheng <chenghe...@gmail.com> 于2018年9月25日周二 下午9:45写道:
>
>> Hi vino,
>>
>> There are no order problems of stream-stream join in Flink. No matter
>> what order the elements come, stream-stream join in Flink will output
>> results which consistent with standard SQL semantics. I haven't read the
>> book you mentioned. For join, it doesn't guarantee output orders. You have
>> to do orderBy if you want to get ordered results.
>>
>> Best, Hequn
>>
>> On Tue, Sep 25, 2018 at 8:36 PM vino yang <yanghua1...@gmail.com> wrote:
>>
>>> Hi Fabian,
>>>
>>> I may not have stated it here, and there is no semantic problem at the
>>> Flink implementation level. Rather, there may be “Time-dependence” here. [1]
>>>
>>> Yes, my initial answer was not to use this form of join in this
>>> scenario, but Henry said he converted the table into a stream table and
>>> asked about the feasibility of other methods.
>>>
>>> [1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part 3:
>>> Derived Data, Chapter 11: Stream Processing , Stream Joins.
>>>
>>> some content :
>>>
>>> *If the ordering of events across streams is undetermined, the join
>>> becomes nondeter‐ ministic [87], which means you cannot rerun the same job
>>> on the same input and necessarily get the same result: the events on the
>>> input streams may be interleaved in a different way when you run the job
>>> again. *
>>>
>>>
>>> Fabian Hueske <fhue...@gmail.com> 于2018年9月25日周二 下午8:08写道:
>>>
>>>> Hi,
>>>>
>>>> I don't think that using the current join implementation in the Table
>>>> API / SQL will work.
>>>> The non-windowed join fully materializes *both* input tables in state.
>>>> This is necessary, because the join needs to be able to process updates on
>>>> either side.
>>>> While this is not a problem for the fixed sized MySQL table,
>>>> materializing the append-only table (aka stream) is probably not what you
>>>> want.
>>>> You can also not limit idle state retention because it would remove the
>>>> MySQL table from state at some point.
>>>>
>>>> The only way to make it work is using a user-defined TableFunction that
>>>> queries the MySQL table via JDBC.
>>>> However, please note that these calls would be synchronous, blocking
>>>> calls.
>>>>
>>>> @Vino: Why do you think that the stream & stream join is not mature and
>>>> which problems do you see in the semantics?
>>>> The semantics are correct (standard SQL semantics) and in my opinion
>>>> the implementation is also mature.
>>>> However, you should not use the non-windowed join if any of the input
>>>> tables is ever growing because both sides must be hold in state. This is
>>>> not an issue of the semantics.
>>>>
>>>> Cheers,
>>>> Fabian
>>>>
>>>> Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang <
>>>> yanghua1...@gmail.com>:
>>>>
>>>>> Hi Henry,
>>>>>
>>>>> 1) I don't recommend this method very much, but you said that you
>>>>> expect to convert mysql table to stream and then to flink table. Under 
>>>>> this
>>>>> premise, I said that you can do this by joining two stream tables. But as
>>>>> you know, this join depends on the time period in which the state is 
>>>>> saved.
>>>>> To make it equivalent to a dimension table, you must permanently save the
>>>>> state of the stream table that is defined as a "dimension table." I just
>>>>> said that modifying the relevant configuration in Flink can do this, Not
>>>>> for a single table.
>>>>>
>>>>> 2) Imagine that there are one million records in two tables. The
>>>>> records in both tables are just beginning to stream into flink, and the
>>>>> records as dimension tables are not fully arrived. Therefore, your 
>>>>> matching
>>>>> results may not be as accurate as directly querying Mysql.
>>>>>
>>>>> In fact, the current stream & stream join is not very mature, there
>>>>> are some problems in semantics, I personally recommend that you return to
>>>>> stream/batch (mysql) join. For more principle content, I recommend you 
>>>>> read
>>>>> a book, referred to as 《DDIA》.
>>>>>
>>>>> Thanks, vino.
>>>>>
>>>>> 徐涛 <happydexu...@gmail.com> 于2018年9月25日周二 下午5:48写道:
>>>>>
>>>>>> Hi Vino,
>>>>>> I do not quite understand in some sentences below, would you please
>>>>>> help explain it a bit more detailedly?
>>>>>> 1. “*such as setting the state retention time of one of the tables
>>>>>> to be permanent*” , as I know, the state retention time is a global
>>>>>> config, I can not set this property per table.
>>>>>> 2. "*you may not be able to match the results, because the data
>>>>>> belonging to the mysql table is just beginning to play as a stream*”
>>>>>>  Why it is not able to match the results?
>>>>>>
>>>>>> Best
>>>>>> Henry
>>>>>>
>>>>>> 在 2018年9月25日,下午5:29,vino yang <yanghua1...@gmail.com> 写道:
>>>>>>
>>>>>> Hi Henry,
>>>>>>
>>>>>> If you have converted the mysql table to a flink stream table. In
>>>>>> flink table/sql, streams and stream joins can also do this, such as 
>>>>>> setting
>>>>>> the state retention time of one of the tables to be permanent. But when 
>>>>>> the
>>>>>> job is just running, you may not be able to match the results, because 
>>>>>> the
>>>>>> data belonging to the mysql table is just beginning to play as a stream.
>>>>>>
>>>>>> Thanks, vino.
>>>>>>
>>>>>> 徐涛 <happydexu...@gmail.com> 于2018年9月25日周二 下午5:10写道:
>>>>>>
>>>>>>> Hi Vino & Hequn,
>>>>>>> I am now using the table/sql API, if I import the mysql table as a
>>>>>>> stream then convert it into a table, it seems that it can also be a
>>>>>>> workaround for batch/streaming joining. May I ask what is the difference
>>>>>>> between the UDTF method? Does this implementation has some defects?
>>>>>>> Best
>>>>>>> Henry
>>>>>>>
>>>>>>> 在 2018年9月22日,上午10:28,Hequn Cheng <chenghe...@gmail.com> 写道:
>>>>>>>
>>>>>>> Hi
>>>>>>>
>>>>>>> +1 for vino's answer.
>>>>>>> Also, this kind of join will be supported in FLINK-9712
>>>>>>> <https://issues.apache.org/jira/browse/FLINK-9712>. You can check
>>>>>>> more details in the jira.
>>>>>>>
>>>>>>> Best, Hequn
>>>>>>>
>>>>>>> On Fri, Sep 21, 2018 at 4:51 PM vino yang <yanghua1...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Henry,
>>>>>>>>
>>>>>>>> There are three ways I can think of:
>>>>>>>>
>>>>>>>> 1) use DataStream API, implement a flatmap UDF to access dimension
>>>>>>>> table;
>>>>>>>> 2) use table/sql API, implement a UDTF to access dimension table;
>>>>>>>> 3) customize the table/sql join API/statement's implementation (and
>>>>>>>> change the physical plan)
>>>>>>>>
>>>>>>>> Thanks, vino.
>>>>>>>>
>>>>>>>> 徐涛 <happydexu...@gmail.com> 于2018年9月21日周五 下午4:43写道:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>         Sometimes some “dimension table” need to be joined from
>>>>>>>>> the "fact table", if data are not joined before sent to Kafka.
>>>>>>>>>         So if the data are joined in Flink, does the “dimension
>>>>>>>>> table” have to be import as a stream, or there are some other ways can
>>>>>>>>> achieve it?
>>>>>>>>>         Thanks a lot!
>>>>>>>>>
>>>>>>>>> Best
>>>>>>>>> Henry
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>

Reply via email to