Hi Hequn, Thanks a lot four your response! This helps me understand the mechanism more clearly.
I have another question: How do I use flink to accoplish time attenuation? If a use join plus retention time solution, I can only get the increment data. But some other data may need to be recomputed because the time attenuation. Then how do I flush them? Best, Henry, > 在 2018年8月22日,上午10:30,Hequn Cheng <chenghe...@gmail.com> 写道: > > Hi Hery, > > As for choise1: > The state size of join depends on it's input table size, not the result > table, so the state size of join of choise1 depends on how many article id, > praise id and response_id. > Also non-window join will merge same rows in it's state, i.e, <Row, RowCnt>, > so the state size won't grows if you keep pouring same article id. I think > the problem here is you need a distinct before join, so that a praise id > won't join multi same article ids, and this will influence the correctness of > the result. > I think you need do aggregate before join to make sure the correctness of the > result. Because there are duplicated article id after article join praise and > this will influence the value of count(r.response_id). > You can't use window or other bounded operators after non-window join. The > time attribute fields can not be passed through because of semantic conflict. > Hop window with large fixed duration and small hop interval should be > avoided. Data will be redundant in various windows. For example, a hopping > window of 15 minutes size and 5 minute hop interval assigns each row to 3 > different windows of 15 minute size. > As for choice2: > I think you need another filed(for example, HOP_START) when join the three > tables. Only join records in same window. > To solve your problem, I think we can do non-window group by first and then > join three result tables. Furthermore, state retention time can be set to > keep state from growing larger. > > Best, Hequn > > On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <happydexu...@gmail.com > <mailto:happydexu...@gmail.com>> wrote: > Hi Fabian, > So maybe I can not join a table that generate from a window, because > the table is getting larger and larger as the time goes, maybe the system > will crash one day. > > I am working on a system that calculate the “score" of article, which > is consist of the count of article praise, the count of article response, etc > Because I can not use flink to save all the article, I decide to update > the score of the article that created in 3 days. > > I have two choises, > 1. join the article table and praise table, response table then window > select a.article_id, count(p.praise_id) as pCount, > count(r.response_id) as rCount > from > article a > left join > praise p on a.article_id = p.article_id > left join > response r on a.article_id = r.article_id > group by hop(updated_time, interval '1' minute,interval '3' > day) , article_id > 2. window the article table, window the priase table, window the > response table ,then join them together > select aAggr.article_id, pAggr.pCount, rAggr.rCount > (select article_id from article group by hop(updated_time, > interval '1' minute,interval '3' day) , article_id) aAggr > left join > (select article_id,count(praise_id) as pCount from praise group > by hop(updated_time, interval '1' minute,interval '3' day) , article_id) > pAggr on aAggr.article_id=pAggr.article_id > left join > (select article_id,count(response_id) as rCount from response > group by hop(updated_time, interval '1' minute,interval '3' day) , > article_id) rAggr on aAggr.article_id=rAggr.article_id > > Maybe I should choose 1, join then window, but not window then join. > > Please correct me if I am wrong. > > I have some worries when choose 1, > I do not know how Flink works internally, it seems that in the sql , > table article ,table praise, table response is growing as the time goes by, > will it introduce performance issue? > > Best, > Henry > >> 在 2018年8月21日,下午9:29,Hequn Cheng <chenghe...@gmail.com >> <mailto:chenghe...@gmail.com>> 写道: >> >> Hi Henry, >> >> praiseAggr is an append table, so it contains "100,101,102,100,101,103,100". >> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY >> article_id", the answer is "101,102,103" >> 2. if you change your sql to s"SELECT last_value(article_id) FROM praise", >> the answer is "100" >> >> Best, Hequn >> >> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <happydexu...@gmail.com >> <mailto:happydexu...@gmail.com>> wrote: >> Hi Fabian, >> Thanks for your response. This question puzzles me for quite a long >> time. >> If the praiseAggr has the following value: >> window-1 100,101,102 >> window-2 100,101,103 >> window-3 100 >> >> the last time the article table joins praiseAggr, which of the >> following value does praiseAggr table has? >> 1— 100,101,102,100,101,103,100 collect all the element >> of all the window >> 2— 100 the element >> of the latest window >> 3— 101,102,103 the distinct value >> of all the window >> >> >> Best, >> Henry >> >> >>> 在 2018年8月21日,下午8:02,Fabian Hueske <fhue...@gmail.com >>> <mailto:fhue...@gmail.com>> 写道: >>> >>> Hi, >>> >>> The semantics of a query do not depend on the way that it is used. >>> praiseAggr is a table that grows by one row per second and article_id. If >>> you use that table in a join, the join will fully materialize the table. >>> This is a special case because the same row is added multiple times, so the >>> state won't grow that quickly, but the performance will decrease because >>> for each row from article will join with multiple (a growing number) of >>> rows from praiseAggr. >>> >>> Best, Fabian >>> >>> 2018-08-21 12:19 GMT+02:00 徐涛 <happydexu...@gmail.com >>> <mailto:happydexu...@gmail.com>>: >>> Hi All, >>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise >>> GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , >>> article_id" ) >>> tableEnv.registerTable("praiseAggr", praiseAggr) >>> var finalTable = tableEnv.sqlQuery(s”SELECT 1 FROM article a join >>> praiseAggr p on a.article_id=p.article_id" ) >>> tableEnv.registerTable("finalTable", finalTable) >>> I know that praiseAggr, if written to sink, is append mode , so if a >>> table joins praiseAggr, what the table “see”, is a table contains the >>> latest value, or a table that grows larger and larger? If it is the later, >>> will it introduce performance problem? >>> Thanks a lot. >>> >>> >>> Best, >>> Henry >>> >> >> >