Hi Fabian, I am working on a application that compute the “score" of an article by the number of praises, and reduce the score by the time, I am balancing on two choices: 1. Use global window join the article and article praise, with 3 days state retention, but I can not get the current time ,time is fixed when the program is started, so I can not compute the reduced score. I have to sink the data, then write some crontab jobs to update the score. 2. Use sliding window join, window length is 3 days , and sliding by one minute, this time I can get the window end time, but there so much data duplicated in windows, there are performance issues. Each choices is not good enough, I am wondering if there are some other solves. Thanks a lot.
Best Henry > 在 2018年8月28日,下午8:05,Fabian Hueske <fhue...@gmail.com> 写道: > > Hi, > > Currently, Flink's window operators require increasing timestamp attributes. > This limitation exists to be able to clean up the state of a window operator. > A join operator does not preserve the order of timestamps. Hence, timestamp > attributes lose their monotonictity property and a window operator cannot be > applied. > > Have you tried to use a window join? These preserve the timestamp order. > > Fabian > > 徐涛 <happydexu...@gmail.com <mailto:happydexu...@gmail.com>> schrieb am Di., > 28. Aug. 2018, 11:42: > Hi Hequn, > You can't use window or other bounded operators after non-window join. > The time attribute fields can not be passed through because of semantic > conflict. > Why does Flink have this limitation? > I have a temp view > > var finalTable = tableEnv.sqlQuery(s"select * from > A join B on xxxx > join C on xxxx " ) > tableEnv.registerTable("finalTable", finalTable) > > And I want to window this table because I want it to output 1 minute > per second, however obviously I can not do this now, may I ask how can I make > a “final table” to output 1 minute per second? And if a table is a retract > stream, will the item added to the window be retracted either? > > Thanks a lot. > > > Best > Henry > > > >> 在 2018年8月22日,上午10:30,Hequn Cheng <chenghe...@gmail.com >> <mailto:chenghe...@gmail.com>> 写道: >> >> Hi Hery, >> >> As for choise1: >> The state size of join depends on it's input table size, not the result >> table, so the state size of join of choise1 depends on how many article id, >> praise id and response_id. >> Also non-window join will merge same rows in it's state, i.e, <Row, RowCnt>, >> so the state size won't grows if you keep pouring same article id. I think >> the problem here is you need a distinct before join, so that a praise id >> won't join multi same article ids, and this will influence the correctness >> of the result. >> I think you need do aggregate before join to make sure the correctness of >> the result. Because there are duplicated article id after article join >> praise and this will influence the value of count(r.response_id). >> You can't use window or other bounded operators after non-window join. The >> time attribute fields can not be passed through because of semantic conflict. >> Hop window with large fixed duration and small hop interval should be >> avoided. Data will be redundant in various windows. For example, a hopping >> window of 15 minutes size and 5 minute hop interval assigns each row to 3 >> different windows of 15 minute size. >> As for choice2: >> I think you need another filed(for example, HOP_START) when join the three >> tables. Only join records in same window. >> To solve your problem, I think we can do non-window group by first and then >> join three result tables. Furthermore, state retention time can be set to >> keep state from growing larger. >> >> Best, Hequn >> >> On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <happydexu...@gmail.com >> <mailto:happydexu...@gmail.com>> wrote: >> Hi Fabian, >> So maybe I can not join a table that generate from a window, because >> the table is getting larger and larger as the time goes, maybe the system >> will crash one day. >> >> I am working on a system that calculate the “score" of article, which >> is consist of the count of article praise, the count of article response, etc >> Because I can not use flink to save all the article, I decide to update >> the score of the article that created in 3 days. >> >> I have two choises, >> 1. join the article table and praise table, response table then window >> select a.article_id, count(p.praise_id) as pCount, >> count(r.response_id) as rCount >> from >> article a >> left join >> praise p on a.article_id = p.article_id >> left join >> response r on a.article_id = r.article_id >> group by hop(updated_time, interval '1' minute,interval '3' >> day) , article_id >> 2. window the article table, window the priase table, window the >> response table ,then join them together >> select aAggr.article_id, pAggr.pCount, rAggr.rCount >> (select article_id from article group by hop(updated_time, >> interval '1' minute,interval '3' day) , article_id) aAggr >> left join >> (select article_id,count(praise_id) as pCount from praise group >> by hop(updated_time, interval '1' minute,interval '3' day) , article_id) >> pAggr on aAggr.article_id=pAggr.article_id >> left join >> (select article_id,count(response_id) as rCount from response >> group by hop(updated_time, interval '1' minute,interval '3' day) , >> article_id) rAggr on aAggr.article_id=rAggr.article_id >> >> Maybe I should choose 1, join then window, but not window then join. >> >> Please correct me if I am wrong. >> >> I have some worries when choose 1, >> I do not know how Flink works internally, it seems that in the sql , >> table article ,table praise, table response is growing as the time goes by, >> will it introduce performance issue? >> >> Best, >> Henry >> >>> 在 2018年8月21日,下午9:29,Hequn Cheng <chenghe...@gmail.com >>> <mailto:chenghe...@gmail.com>> 写道: >>> >>> Hi Henry, >>> >>> praiseAggr is an append table, so it contains "100,101,102,100,101,103,100". >>> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY >>> article_id", the answer is "101,102,103" >>> 2. if you change your sql to s"SELECT last_value(article_id) FROM praise", >>> the answer is "100" >>> >>> Best, Hequn >>> >>> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <happydexu...@gmail.com >>> <mailto:happydexu...@gmail.com>> wrote: >>> Hi Fabian, >>> Thanks for your response. This question puzzles me for quite a long >>> time. >>> If the praiseAggr has the following value: >>> window-1 100,101,102 >>> window-2 100,101,103 >>> window-3 100 >>> >>> the last time the article table joins praiseAggr, which of the >>> following value does praiseAggr table has? >>> 1— 100,101,102,100,101,103,100 collect all the element >>> of all the window >>> 2— 100 the element >>> of the latest window >>> 3— 101,102,103 the distinct value >>> of all the window >>> >>> >>> Best, >>> Henry >>> >>> >>>> 在 2018年8月21日,下午8:02,Fabian Hueske <fhue...@gmail.com >>>> <mailto:fhue...@gmail.com>> 写道: >>>> >>>> Hi, >>>> >>>> The semantics of a query do not depend on the way that it is used. >>>> praiseAggr is a table that grows by one row per second and article_id. If >>>> you use that table in a join, the join will fully materialize the table. >>>> This is a special case because the same row is added multiple times, so >>>> the state won't grow that quickly, but the performance will decrease >>>> because for each row from article will join with multiple (a growing >>>> number) of rows from praiseAggr. >>>> >>>> Best, Fabian >>>> >>>> 2018-08-21 12:19 GMT+02:00 徐涛 <happydexu...@gmail.com >>>> <mailto:happydexu...@gmail.com>>: >>>> Hi All, >>>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise >>>> GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , >>>> article_id" ) >>>> tableEnv.registerTable("praiseAggr", praiseAggr) >>>> var finalTable = tableEnv.sqlQuery(s”SELECT 1 FROM article a join >>>> praiseAggr p on a.article_id=p.article_id" ) >>>> tableEnv.registerTable("finalTable", finalTable) >>>> I know that praiseAggr, if written to sink, is append mode , so if a >>>> table joins praiseAggr, what the table “see”, is a table contains the >>>> latest value, or a table that grows larger and larger? If it is the later, >>>> will it introduce performance problem? >>>> Thanks a lot. >>>> >>>> >>>> Best, >>>> Henry >>>> >>> >>> >> >