Hi Henry, Fabian is right. You can try to use window join if your want a bounded join.
According to your descriptions. I think what you want is(correct me if I'm wrong) : - Only join data within 3 days - Score should be calculated in bounded way - Retract previous score which exceed 3 days So, I think window join + bounded over may solve your problem. Do window join by `article.time between praise.time - 3days and praise.time + 3days`. You don't have to add sliding window before window join. After window join, you can perform a bounded over with 3 days interval to get the scores. There are documents about window join[1] and over[2]. [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#aggregations On Tue, Aug 28, 2018 at 9:59 PM 徐涛 <happydexu...@gmail.com> wrote: > Hi Fabian, > I am working on a application that compute the “score" of an article by > the number of praises, and reduce the score by the time, I am balancing on > two choices: > 1. Use global window join the article and article praise, with 3 days > state retention, but I can not get the current time ,time is fixed when the > program is started, so I can not compute the reduced score. I have to sink > the data, then write some crontab jobs to update the score. > 2. Use sliding window join, window length is 3 days , and sliding by one > minute, this time I can get the window end time, but there so much data > duplicated in windows, there are performance issues. > Each choices is not good enough, I am wondering if there are some other > solves. Thanks a lot. > > Best > Henry > > 在 2018年8月28日,下午8:05,Fabian Hueske <fhue...@gmail.com> 写道: > > Hi, > > Currently, Flink's window operators require increasing timestamp > attributes. This limitation exists to be able to clean up the state of a > window operator. A join operator does not preserve the order of timestamps. > Hence, timestamp attributes lose their monotonictity property and a window > operator cannot be applied. > > Have you tried to use a window join? These preserve the timestamp order. > > Fabian > > 徐涛 <happydexu...@gmail.com> schrieb am Di., 28. Aug. 2018, 11:42: > >> Hi Hequn, >> You can't use window or other bounded operators after non-window join. >> The time attribute fields can not be passed through because of semantic >> conflict. >> Why does Flink have this limitation? >> I have a temp view >> var finalTable = tableEnv.sqlQuery(s"select * from >> A join B on xxxx >> join C on xxxx " ) >> tableEnv.registerTable("finalTable", finalTable) >> >> And I want to window this table because I want it to output 1 >> minute per second, however obviously I can not do this now, may I ask how >> can I make a “final table” to output 1 minute per second? And if a table is >> a retract stream, will the item added to the window be retracted either? >> >> Thanks a lot. >> >> Best >> Henry >> >> >> >> 在 2018年8月22日,上午10:30,Hequn Cheng <chenghe...@gmail.com> 写道: >> >> Hi Hery, >> >> As for choise1: >> >> - The state size of join depends on it's input table size, not the >> result table, so the state size of join of choise1 depends on how many >> article id, praise id and response_id. >> - Also non-window join will merge same rows in it's state, i.e, <Row, >> RowCnt>, so the state size won't grows if you keep pouring same article >> id. >> I think the problem here is you need a distinct before join, so that a >> praise id won't join multi same article ids, and this will influence the >> correctness of the result. >> - I think you need do aggregate before join to make sure the >> correctness of the result. Because there are duplicated article id after >> article join praise and this will influence the value >> of count(r.response_id). >> - You can't use window or other bounded operators after non-window >> join. The time attribute fields can not be passed through because of >> semantic conflict. >> - Hop window with large fixed duration and small hop interval should >> be avoided. Data will be redundant in various windows. For example, a >> hopping window of 15 minutes size and 5 minute hop interval assigns each >> row to 3 different windows of 15 minute size. >> >> As for choice2: >> >> - I think you need another filed(for example, HOP_START) when join >> the three tables. Only join records in same window. >> >> To solve your problem, I think we can do non-window group by first and >> then join three result tables. Furthermore, state retention time can be set >> to keep state from growing larger. >> >> Best, Hequn >> >> On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <happydexu...@gmail.com> wrote: >> >>> Hi Fabian, >>> So maybe I can not join a table that generate from a window, because the >>> table is getting larger and larger as the time goes, maybe the system will >>> crash one day. >>> >>> I am working on a system that calculate the “score" of article, which is >>> consist of the count of article praise, the count of article response, etc >>> Because I can not use flink to save all the article, I decide to update >>> the score of the article that created in 3 days. >>> >>> I have two choises, >>> 1. join the article table and praise table, response table then window >>> select a.article_id, count(p.praise_id) as pCount, count(r.response_id) >>> as rCount >>> from >>> article a >>> left join >>> praise p on a.article_id = p.article_id >>> left join >>> response r on a.article_id = r.article_id >>> group by hop(updated_time, interval '1' minute,interval '3' day) , >>> article_id >>> 2. window the article table, window the priase table, window the >>> response table ,then join them together >>> select aAggr.article_id, pAggr.pCount, rAggr.rCount >>> (select article_id from article group by hop(updated_time, interval '1' >>> minute,interval '3' day) , article_id) aAggr >>> left join >>> (select article_id,count(praise_id) as pCount from praise group by hop( >>> updated_time, interval '1' minute,interval '3' day) , article_id) pAggr >>> on aAggr.article_id=pAggr.article_id >>> left join >>> (select article_id,count(response_id) as rCount from response group by >>> hop(updated_time, interval '1' minute,interval '3' day) , article_id) >>> rAggr on aAggr.article_id=rAggr.article_id >>> >>> Maybe I should choose 1, join then window, but not window then join. >>> Please correct me if I am wrong. >>> >>> I have some worries when choose 1, >>> I do not know how Flink works internally, it seems that in the sql , >>> table article ,table praise, table response is growing as the time goes by, >>> will it introduce performance issue? >>> >>> Best, >>> Henry >>> >>> 在 2018年8月21日,下午9:29,Hequn Cheng <chenghe...@gmail.com> 写道: >>> >>> Hi Henry, >>> >>> praiseAggr is an append table, so it contains >>> "100,101,102,100,101,103,100". >>> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY >>> article_id", the answer is "101,102,103" >>> 2. if you change your sql to s"SELECT last_value(article_id) FROM >>> praise", the answer is "100" >>> >>> Best, Hequn >>> >>> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <happydexu...@gmail.com> wrote: >>> >>>> Hi Fabian, >>>> Thanks for your response. This question puzzles me for quite a long >>>> time. >>>> If the praiseAggr has the following value: >>>> window-1 100,101,102 >>>> window-2 100,101,103 >>>> window-3 100 >>>> >>>> the last time the article table joins praiseAggr, which of the >>>> following value does praiseAggr table has? >>>> 1— 100,101,102,100,101,103,100 collect all the element of >>>> all the window >>>> 2— 100 the element of the latest window >>>> 3— 101,102,103 the distinct value of all the window >>>> >>>> >>>> Best, >>>> Henry >>>> >>>> >>>> 在 2018年8月21日,下午8:02,Fabian Hueske <fhue...@gmail.com> 写道: >>>> >>>> Hi, >>>> >>>> The semantics of a query do not depend on the way that it is used. >>>> praiseAggr is a table that grows by one row per second and article_id. >>>> If you use that table in a join, the join will fully materialize the table. >>>> This is a special case because the same row is added multiple times, so >>>> the state won't grow that quickly, but the performance will decrease >>>> because for each row from article will join with multiple (a growing >>>> number) of rows from praiseAggr. >>>> >>>> Best, Fabian >>>> >>>> 2018-08-21 12:19 GMT+02:00 徐涛 <happydexu...@gmail.com>: >>>> >>>>> Hi All, >>>>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise >>>>> GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , >>>>> article_id" ) >>>>> tableEnv.registerTable("praiseAggr", praiseAggr) >>>>> >>>>> var finalTable = tableEnv.sqlQuery(*s**”**SELECT 1 FROM article a >>>>> join praiseAggr p on a.article_id=p.article_id" *) >>>>> tableEnv.registerTable("finalTable", finalTable) >>>>> >>>>> I know that praiseAggr, if written to sink, is append mode , so if a >>>>> table joins praiseAggr, what the table “see”, is a table contains the >>>>> latest value, or a table that grows larger and larger? If it is the later, >>>>> will it introduce performance problem? >>>>> Thanks a lot. >>>>> >>>>> >>>>> Best, >>>>> Henry >>>>> >>>> >>>> >>>> >>> >>> >> >