Hi vino, Thanks for sharing the link. It's a great book and I will take a look. There are kinds of join. Different joins have different semantics. From the link, I think it means the time versioned join. FLINK-9712 <https://issues.apache.org/jira/browse/FLINK-9712> enrichments joins with Time Versioned Functions and the result is deterministic under eventime.
Best, Hequn On Tue, Sep 25, 2018 at 11:05 PM vino yang <yanghua1...@gmail.com> wrote: > Hi Hequn, > > The specific content of the book does not give a right or wrong > conclusion, but it illustrates this phenomenon: two streams of the same > input, playing and joining at the same time, due to the order of events, > the connection results are uncertain. This is because the two streams are > intertwined in different forms. This has nothing to do with orderby, just > that it exists in the stream stream join. Of course, this phenomenon is > only a comparison statement with a non-stream join. > > In addition, I recommend this book, which is very famous on Twitter and > Amazon. Because you are also Chinese, there is a good translation here. If > I guess it is correct, the main translator is also from your company. This > part of what I mentioned is here.[1] > > [1]: > https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7 > > Thanks, vino. > > Hequn Cheng <chenghe...@gmail.com> 于2018年9月25日周二 下午9:45写道: > >> Hi vino, >> >> There are no order problems of stream-stream join in Flink. No matter >> what order the elements come, stream-stream join in Flink will output >> results which consistent with standard SQL semantics. I haven't read the >> book you mentioned. For join, it doesn't guarantee output orders. You have >> to do orderBy if you want to get ordered results. >> >> Best, Hequn >> >> On Tue, Sep 25, 2018 at 8:36 PM vino yang <yanghua1...@gmail.com> wrote: >> >>> Hi Fabian, >>> >>> I may not have stated it here, and there is no semantic problem at the >>> Flink implementation level. Rather, there may be “Time-dependence” here. [1] >>> >>> Yes, my initial answer was not to use this form of join in this >>> scenario, but Henry said he converted the table into a stream table and >>> asked about the feasibility of other methods. >>> >>> [1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part 3: >>> Derived Data, Chapter 11: Stream Processing , Stream Joins. >>> >>> some content : >>> >>> *If the ordering of events across streams is undetermined, the join >>> becomes nondeter‐ ministic [87], which means you cannot rerun the same job >>> on the same input and necessarily get the same result: the events on the >>> input streams may be interleaved in a different way when you run the job >>> again. * >>> >>> >>> Fabian Hueske <fhue...@gmail.com> 于2018年9月25日周二 下午8:08写道: >>> >>>> Hi, >>>> >>>> I don't think that using the current join implementation in the Table >>>> API / SQL will work. >>>> The non-windowed join fully materializes *both* input tables in state. >>>> This is necessary, because the join needs to be able to process updates on >>>> either side. >>>> While this is not a problem for the fixed sized MySQL table, >>>> materializing the append-only table (aka stream) is probably not what you >>>> want. >>>> You can also not limit idle state retention because it would remove the >>>> MySQL table from state at some point. >>>> >>>> The only way to make it work is using a user-defined TableFunction that >>>> queries the MySQL table via JDBC. >>>> However, please note that these calls would be synchronous, blocking >>>> calls. >>>> >>>> @Vino: Why do you think that the stream & stream join is not mature and >>>> which problems do you see in the semantics? >>>> The semantics are correct (standard SQL semantics) and in my opinion >>>> the implementation is also mature. >>>> However, you should not use the non-windowed join if any of the input >>>> tables is ever growing because both sides must be hold in state. This is >>>> not an issue of the semantics. >>>> >>>> Cheers, >>>> Fabian >>>> >>>> Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang < >>>> yanghua1...@gmail.com>: >>>> >>>>> Hi Henry, >>>>> >>>>> 1) I don't recommend this method very much, but you said that you >>>>> expect to convert mysql table to stream and then to flink table. Under >>>>> this >>>>> premise, I said that you can do this by joining two stream tables. But as >>>>> you know, this join depends on the time period in which the state is >>>>> saved. >>>>> To make it equivalent to a dimension table, you must permanently save the >>>>> state of the stream table that is defined as a "dimension table." I just >>>>> said that modifying the relevant configuration in Flink can do this, Not >>>>> for a single table. >>>>> >>>>> 2) Imagine that there are one million records in two tables. The >>>>> records in both tables are just beginning to stream into flink, and the >>>>> records as dimension tables are not fully arrived. Therefore, your >>>>> matching >>>>> results may not be as accurate as directly querying Mysql. >>>>> >>>>> In fact, the current stream & stream join is not very mature, there >>>>> are some problems in semantics, I personally recommend that you return to >>>>> stream/batch (mysql) join. For more principle content, I recommend you >>>>> read >>>>> a book, referred to as 《DDIA》. >>>>> >>>>> Thanks, vino. >>>>> >>>>> 徐涛 <happydexu...@gmail.com> 于2018年9月25日周二 下午5:48写道: >>>>> >>>>>> Hi Vino, >>>>>> I do not quite understand in some sentences below, would you please >>>>>> help explain it a bit more detailedly? >>>>>> 1. “*such as setting the state retention time of one of the tables >>>>>> to be permanent*” , as I know, the state retention time is a global >>>>>> config, I can not set this property per table. >>>>>> 2. "*you may not be able to match the results, because the data >>>>>> belonging to the mysql table is just beginning to play as a stream*” >>>>>> Why it is not able to match the results? >>>>>> >>>>>> Best >>>>>> Henry >>>>>> >>>>>> 在 2018年9月25日,下午5:29,vino yang <yanghua1...@gmail.com> 写道: >>>>>> >>>>>> Hi Henry, >>>>>> >>>>>> If you have converted the mysql table to a flink stream table. In >>>>>> flink table/sql, streams and stream joins can also do this, such as >>>>>> setting >>>>>> the state retention time of one of the tables to be permanent. But when >>>>>> the >>>>>> job is just running, you may not be able to match the results, because >>>>>> the >>>>>> data belonging to the mysql table is just beginning to play as a stream. >>>>>> >>>>>> Thanks, vino. >>>>>> >>>>>> 徐涛 <happydexu...@gmail.com> 于2018年9月25日周二 下午5:10写道: >>>>>> >>>>>>> Hi Vino & Hequn, >>>>>>> I am now using the table/sql API, if I import the mysql table as a >>>>>>> stream then convert it into a table, it seems that it can also be a >>>>>>> workaround for batch/streaming joining. May I ask what is the difference >>>>>>> between the UDTF method? Does this implementation has some defects? >>>>>>> Best >>>>>>> Henry >>>>>>> >>>>>>> 在 2018年9月22日,上午10:28,Hequn Cheng <chenghe...@gmail.com> 写道: >>>>>>> >>>>>>> Hi >>>>>>> >>>>>>> +1 for vino's answer. >>>>>>> Also, this kind of join will be supported in FLINK-9712 >>>>>>> <https://issues.apache.org/jira/browse/FLINK-9712>. You can check >>>>>>> more details in the jira. >>>>>>> >>>>>>> Best, Hequn >>>>>>> >>>>>>> On Fri, Sep 21, 2018 at 4:51 PM vino yang <yanghua1...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Henry, >>>>>>>> >>>>>>>> There are three ways I can think of: >>>>>>>> >>>>>>>> 1) use DataStream API, implement a flatmap UDF to access dimension >>>>>>>> table; >>>>>>>> 2) use table/sql API, implement a UDTF to access dimension table; >>>>>>>> 3) customize the table/sql join API/statement's implementation (and >>>>>>>> change the physical plan) >>>>>>>> >>>>>>>> Thanks, vino. >>>>>>>> >>>>>>>> 徐涛 <happydexu...@gmail.com> 于2018年9月21日周五 下午4:43写道: >>>>>>>> >>>>>>>>> Hi All, >>>>>>>>> Sometimes some “dimension table” need to be joined from >>>>>>>>> the "fact table", if data are not joined before sent to Kafka. >>>>>>>>> So if the data are joined in Flink, does the “dimension >>>>>>>>> table” have to be import as a stream, or there are some other ways can >>>>>>>>> achieve it? >>>>>>>>> Thanks a lot! >>>>>>>>> >>>>>>>>> Best >>>>>>>>> Henry >>>>>>>> >>>>>>>> >>>>>>> >>>>>>