Hi, Currently, Flink's window operators require increasing timestamp attributes. This limitation exists to be able to clean up the state of a window operator. A join operator does not preserve the order of timestamps. Hence, timestamp attributes lose their monotonictity property and a window operator cannot be applied.
Have you tried to use a window join? These preserve the timestamp order. Fabian 徐涛 <happydexu...@gmail.com> schrieb am Di., 28. Aug. 2018, 11:42: > Hi Hequn, > You can't use window or other bounded operators after non-window join. The > time attribute fields can not be passed through because of semantic > conflict. > Why does Flink have this limitation? > I have a temp view > var finalTable = tableEnv.sqlQuery(s"select * from > A join B on xxxx > join C on xxxx " ) > tableEnv.registerTable("finalTable", finalTable) > > And I want to window this table because I want it to output 1 minute > per second, however obviously I can not do this now, may I ask how can I > make a “final table” to output 1 minute per second? And if a table is a > retract stream, will the item added to the window be retracted either? > > Thanks a lot. > > Best > Henry > > > > 在 2018年8月22日,上午10:30,Hequn Cheng <chenghe...@gmail.com> 写道: > > Hi Hery, > > As for choise1: > > - The state size of join depends on it's input table size, not the > result table, so the state size of join of choise1 depends on how many > article id, praise id and response_id. > - Also non-window join will merge same rows in it's state, i.e, <Row, > RowCnt>, so the state size won't grows if you keep pouring same article id. > I think the problem here is you need a distinct before join, so that a > praise id won't join multi same article ids, and this will influence the > correctness of the result. > - I think you need do aggregate before join to make sure the > correctness of the result. Because there are duplicated article id after > article join praise and this will influence the value > of count(r.response_id). > - You can't use window or other bounded operators after non-window > join. The time attribute fields can not be passed through because of > semantic conflict. > - Hop window with large fixed duration and small hop interval should > be avoided. Data will be redundant in various windows. For example, a > hopping window of 15 minutes size and 5 minute hop interval assigns each > row to 3 different windows of 15 minute size. > > As for choice2: > > - I think you need another filed(for example, HOP_START) when join the > three tables. Only join records in same window. > > To solve your problem, I think we can do non-window group by first and > then join three result tables. Furthermore, state retention time can be set > to keep state from growing larger. > > Best, Hequn > > On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <happydexu...@gmail.com> wrote: > >> Hi Fabian, >> So maybe I can not join a table that generate from a window, because the >> table is getting larger and larger as the time goes, maybe the system will >> crash one day. >> >> I am working on a system that calculate the “score" of article, which is >> consist of the count of article praise, the count of article response, etc >> Because I can not use flink to save all the article, I decide to update >> the score of the article that created in 3 days. >> >> I have two choises, >> 1. join the article table and praise table, response table then window >> select a.article_id, count(p.praise_id) as pCount, count(r.response_id) >> as rCount >> from >> article a >> left join >> praise p on a.article_id = p.article_id >> left join >> response r on a.article_id = r.article_id >> group by hop(updated_time, interval '1' minute,interval '3' day) , >> article_id >> 2. window the article table, window the priase table, window the response >> table ,then join them together >> select aAggr.article_id, pAggr.pCount, rAggr.rCount >> (select article_id from article group by hop(updated_time, interval '1' >> minute,interval '3' day) , article_id) aAggr >> left join >> (select article_id,count(praise_id) as pCount from praise group by hop( >> updated_time, interval '1' minute,interval '3' day) , article_id) pAggr >> on aAggr.article_id=pAggr.article_id >> left join >> (select article_id,count(response_id) as rCount from response group by >> hop(updated_time, interval '1' minute,interval '3' day) , article_id) >> rAggr on aAggr.article_id=rAggr.article_id >> >> Maybe I should choose 1, join then window, but not window then join. >> Please correct me if I am wrong. >> >> I have some worries when choose 1, >> I do not know how Flink works internally, it seems that in the sql , >> table article ,table praise, table response is growing as the time goes by, >> will it introduce performance issue? >> >> Best, >> Henry >> >> 在 2018年8月21日,下午9:29,Hequn Cheng <chenghe...@gmail.com> 写道: >> >> Hi Henry, >> >> praiseAggr is an append table, so it contains >> "100,101,102,100,101,103,100". >> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY >> article_id", the answer is "101,102,103" >> 2. if you change your sql to s"SELECT last_value(article_id) FROM >> praise", the answer is "100" >> >> Best, Hequn >> >> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <happydexu...@gmail.com> wrote: >> >>> Hi Fabian, >>> Thanks for your response. This question puzzles me for quite a long time. >>> If the praiseAggr has the following value: >>> window-1 100,101,102 >>> window-2 100,101,103 >>> window-3 100 >>> >>> the last time the article table joins praiseAggr, which of the following >>> value does praiseAggr table has? >>> 1— 100,101,102,100,101,103,100 collect all the element of all >>> the window >>> 2— 100 the element of the latest window >>> 3— 101,102,103 the distinct value of all the window >>> >>> >>> Best, >>> Henry >>> >>> >>> 在 2018年8月21日,下午8:02,Fabian Hueske <fhue...@gmail.com> 写道: >>> >>> Hi, >>> >>> The semantics of a query do not depend on the way that it is used. >>> praiseAggr is a table that grows by one row per second and article_id. >>> If you use that table in a join, the join will fully materialize the table. >>> This is a special case because the same row is added multiple times, so >>> the state won't grow that quickly, but the performance will decrease >>> because for each row from article will join with multiple (a growing >>> number) of rows from praiseAggr. >>> >>> Best, Fabian >>> >>> 2018-08-21 12:19 GMT+02:00 徐涛 <happydexu...@gmail.com>: >>> >>>> Hi All, >>>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise >>>> GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , >>>> article_id" ) >>>> tableEnv.registerTable("praiseAggr", praiseAggr) >>>> >>>> var finalTable = tableEnv.sqlQuery(*s**”**SELECT 1 FROM article a join >>>> praiseAggr p on a.article_id=p.article_id" *) >>>> tableEnv.registerTable("finalTable", finalTable) >>>> >>>> I know that praiseAggr, if written to sink, is append mode , so if a >>>> table joins praiseAggr, what the table “see”, is a table contains the >>>> latest value, or a table that grows larger and larger? If it is the later, >>>> will it introduce performance problem? >>>> Thanks a lot. >>>> >>>> >>>> Best, >>>> Henry >>>> >>> >>> >>> >> >> >