Hi Fabian,
        I am working on a application that compute the “score" of an article by 
the number of praises, and reduce the score by the time, I am balancing on two 
        1. Use global window join the article and article praise, with 3 days 
state retention, but I can not get the current time ,time is fixed when the 
program is started, so I can not compute the reduced score. I have to sink the 
data, then write some crontab jobs to update the score.
        2. Use sliding window join, window length is 3 days , and sliding by 
one minute, this time I can get the window end time, but there so much data 
duplicated in windows, there are performance issues.
        Each choices is not good enough, I am wondering if there are some other 
solves. Thanks a lot.


> 在 2018年8月28日,下午8:05,Fabian Hueske <fhue...@gmail.com> 写道:
> Hi,
> Currently, Flink's window operators require increasing timestamp attributes. 
> This limitation exists to be able to clean up the state of a window operator. 
> A join operator does not preserve the order of timestamps. Hence, timestamp 
> attributes lose their monotonictity property and a window operator cannot be 
> applied.
> Have you tried to use a window join? These preserve the timestamp order. 
> Fabian
> 徐涛 <happydexu...@gmail.com <mailto:happydexu...@gmail.com>> schrieb am Di., 
> 28. Aug. 2018, 11:42:
> Hi Hequn,
>       You can't use window or other bounded operators after non-window join. 
> The time attribute fields can not be passed through because of semantic 
> conflict.
>        Why does Flink have this limitation?
>        I have a temp view
>         var finalTable = tableEnv.sqlQuery(s"select * from
>               A join B on xxxx
>               join C on xxxx  " )
>       tableEnv.registerTable("finalTable", finalTable)
>       And I want to window this table because I want it to output 1 minute 
> per second, however obviously I can not do this now, may I ask how can I make 
> a “final table” to output 1 minute per second? And if a table is a retract 
> stream, will the item added to the window be retracted either?
>       Thanks a lot.   
> Best
> Henry 
>> 在 2018年8月22日,上午10:30,Hequn Cheng <chenghe...@gmail.com 
>> <mailto:chenghe...@gmail.com>> 写道:
>> Hi Hery,
>> As for choise1:
>> The state size of join depends on it's input table size, not the result 
>> table, so the state size of join of choise1 depends on how many article id, 
>> praise id and response_id. 
>> Also non-window join will merge same rows in it's state, i.e, <Row, RowCnt>, 
>> so the state size won't grows if you keep pouring same article id. I think 
>> the problem here is you need a distinct before join, so that a praise id 
>> won't join multi same article ids, and this will influence the correctness 
>> of the result.
>> I think you need do aggregate before join to make sure the correctness of 
>> the result. Because there are duplicated article id after article join 
>> praise and this will influence the value of count(r.response_id).
>> You can't use window or other bounded operators after non-window join. The 
>> time attribute fields can not be passed through because of semantic conflict.
>> Hop window with large fixed duration and small hop interval should be 
>> avoided. Data will be redundant in various windows. For example, a hopping 
>> window of 15 minutes size and 5 minute hop interval assigns each row to 3 
>> different windows of 15 minute size.
>> As for choice2:
>> I think you need another filed(for example, HOP_START) when join the three 
>> tables. Only join records in same window.
>> To solve your problem, I think we can do non-window group by first and then 
>> join three result tables. Furthermore, state retention time can be set to 
>> keep state from growing larger.
>> Best, Hequn
>> On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <happydexu...@gmail.com 
>> <mailto:happydexu...@gmail.com>> wrote:
>> Hi Fabian,
>>      So maybe I can not join a table that generate from a window, because 
>> the table is getting larger and larger as the time goes, maybe the system 
>> will crash one day. 
>>      I am working on a system that calculate the “score" of article, which 
>> is consist of the count of article praise, the count of article response, etc
>>      Because I can not use flink to save all the article, I decide to update 
>> the score of the article that created in 3 days.
>>      I have two choises,
>>      1. join the article table and praise table, response table then window
>>              select a.article_id, count(p.praise_id) as pCount, 
>> count(r.response_id) as rCount
>>              from
>>                      article a
>>              left join
>>                      praise p on a.article_id = p.article_id
>>              left join
>>                      response r on a.article_id = r.article_id
>>              group by hop(updated_time, interval '1' minute,interval '3' 
>> day) , article_id
>>      2. window the article table, window the priase table, window the 
>> response table ,then join them together
>>              select aAggr.article_id, pAggr.pCount, rAggr.rCount
>>              (select article_id from article group by hop(updated_time, 
>> interval '1' minute,interval '3' day) , article_id) aAggr
>>              left join
>>              (select article_id,count(praise_id) as pCount from praise group 
>> by hop(updated_time, interval '1' minute,interval '3' day) , article_id) 
>> pAggr on aAggr.article_id=pAggr.article_id
>>              left join
>>              (select article_id,count(response_id) as rCount from response 
>> group by hop(updated_time, interval '1' minute,interval '3' day) , 
>> article_id) rAggr on aAggr.article_id=rAggr.article_id
>>      Maybe I should choose 1,   join then window, but not window then join.  
>>      Please correct me if I am wrong.
>>      I have some worries when choose 1,
>>      I do not know how Flink works internally, it seems that in the sql , 
>> table article ,table praise, table response is growing as the time goes by, 
>> will it introduce performance issue? 
>> Best,
>> Henry
>>> 在 2018年8月21日,下午9:29,Hequn Cheng <chenghe...@gmail.com 
>>> <mailto:chenghe...@gmail.com>> 写道:
>>> Hi Henry,
>>> praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
>>> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY 
>>> article_id", the answer is "101,102,103"
>>> 2. if you change your sql to s"SELECT last_value(article_id) FROM praise", 
>>> the answer is "100" 
>>> Best, Hequn
>>> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <happydexu...@gmail.com 
>>> <mailto:happydexu...@gmail.com>> wrote:
>>> Hi Fabian,
>>>     Thanks for your response. This question puzzles me for quite a long 
>>> time.
>>>     If the praiseAggr has the following value:
>>>     window-1     100,101,102
>>>     window-2            100,101,103
>>>     window-3            100
>>>     the last time the article table joins praiseAggr, which of the 
>>> following value does praiseAggr table has?
>>>     1—      100,101,102,100,101,103,100           collect all the element 
>>> of all the window
>>>     2—  100                                                    the element 
>>> of the latest window
>>>     3—  101,102,103                                    the distinct value 
>>> of all the window
>>> Best,
>>> Henry
>>>> 在 2018年8月21日,下午8:02,Fabian Hueske <fhue...@gmail.com 
>>>> <mailto:fhue...@gmail.com>> 写道:
>>>> Hi,
>>>> The semantics of a query do not depend on the way that it is used.
>>>> praiseAggr is a table that grows by one row per second and article_id. If 
>>>> you use that table in a join, the join will fully materialize the table.
>>>> This is a special case because the same row is added multiple times, so 
>>>> the state won't grow that quickly, but the performance will decrease 
>>>> because for each row from article will join with multiple (a growing 
>>>> number) of rows from praiseAggr.
>>>> Best, Fabian
>>>> 2018-08-21 12:19 GMT+02:00 徐涛 <happydexu...@gmail.com 
>>>> <mailto:happydexu...@gmail.com>>:
>>>> Hi All,
>>>>    var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise 
>>>> article_id" )
>>>>    tableEnv.registerTable("praiseAggr", praiseAggr)
>>>>     var finalTable = tableEnv.sqlQuery(s”SELECT 1 FROM article a join 
>>>> praiseAggr p on a.article_id=p.article_id" )
>>>>     tableEnv.registerTable("finalTable", finalTable)
>>>>     I know that praiseAggr, if written to sink, is append mode , so if a 
>>>> table joins praiseAggr, what the table “see”, is a table contains the 
>>>> latest value, or a table that grows larger and larger? If it is the later, 
>>>> will it introduce performance problem?
>>>>     Thanks a lot.
>>>> Best, 
>>>> Henry

Reply via email to