Hi Henry,

Fabian is right. You can try to use window join if your want a bounded join.

According to your descriptions. I think what you want is(correct me if I'm
wrong) :
- Only join data within 3 days
- Score should be calculated in bounded way
- Retract previous score which exceed 3 days

So, I think window join + bounded over may solve your problem. Do window
join by `article.time between praise.time - 3days and praise.time + 3days`.
You don't have to add sliding window before window join. After window join,
you can perform a bounded over with 3 days interval to get the scores.
There are documents about window join[1] and over[2].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#aggregations

On Tue, Aug 28, 2018 at 9:59 PM 徐涛 <happydexu...@gmail.com> wrote:

> Hi Fabian,
> I am working on a application that compute the “score" of an article by
> the number of praises, and reduce the score by the time, I am balancing on
> two choices:
> 1. Use global window join the article and article praise, with 3 days
> state retention, but I can not get the current time ,time is fixed when the
> program is started, so I can not compute the reduced score. I have to sink
> the data, then write some crontab jobs to update the score.
> 2. Use sliding window join, window length is 3 days , and sliding by one
> minute, this time I can get the window end time, but there so much data
> duplicated in windows, there are performance issues.
> Each choices is not good enough, I am wondering if there are some other
> solves. Thanks a lot.
>
> Best
> Henry
>
> 在 2018年8月28日,下午8:05,Fabian Hueske <fhue...@gmail.com> 写道:
>
> Hi,
>
> Currently, Flink's window operators require increasing timestamp
> attributes. This limitation exists to be able to clean up the state of a
> window operator. A join operator does not preserve the order of timestamps.
> Hence, timestamp attributes lose their monotonictity property and a window
> operator cannot be applied.
>
> Have you tried to use a window join? These preserve the timestamp order.
>
> Fabian
>
> 徐涛 <happydexu...@gmail.com> schrieb am Di., 28. Aug. 2018, 11:42:
>
>> Hi Hequn,
>> You can't use window or other bounded operators after non-window join.
>> The time attribute fields can not be passed through because of semantic
>> conflict.
>>        Why does Flink have this limitation?
>>        I have a temp view
>>         var finalTable = tableEnv.sqlQuery(s"select * from
>> A join B on xxxx
>> join C on xxxx " )
>> tableEnv.registerTable("finalTable", finalTable)
>>
>>       And I want to window this table because I want it to output 1
>> minute per second, however obviously I can not do this now, may I ask how
>> can I make a “final table” to output 1 minute per second? And if a table is
>> a retract stream, will the item added to the window be retracted either?
>>
>>       Thanks a lot.
>>
>> Best
>> Henry
>>
>>
>>
>> 在 2018年8月22日,上午10:30,Hequn Cheng <chenghe...@gmail.com> 写道:
>>
>> Hi Hery,
>>
>> As for choise1:
>>
>>    - The state size of join depends on it's input table size, not the
>>    result table, so the state size of join of choise1 depends on how many
>>    article id, praise id and response_id.
>>    - Also non-window join will merge same rows in it's state, i.e, <Row,
>>    RowCnt>, so the state size won't grows if you keep pouring same article 
>> id.
>>    I think the problem here is you need a distinct before join, so that a
>>    praise id won't join multi same article ids, and this will influence the
>>    correctness of the result.
>>    - I think you need do aggregate before join to make sure the
>>    correctness of the result. Because there are duplicated article id after
>>    article join praise and this will influence the value
>>    of count(r.response_id).
>>    - You can't use window or other bounded operators after non-window
>>    join. The time attribute fields can not be passed through because of
>>    semantic conflict.
>>    - Hop window with large fixed duration and small hop interval should
>>    be avoided. Data will be redundant in various windows. For example, a
>>    hopping window of 15 minutes size and 5 minute hop interval assigns each
>>    row to 3 different windows of 15 minute size.
>>
>> As for choice2:
>>
>>    - I think you need another filed(for example, HOP_START) when join
>>    the three tables. Only join records in same window.
>>
>> To solve your problem, I think we can do non-window group by first and
>> then join three result tables. Furthermore, state retention time can be set
>> to keep state from growing larger.
>>
>> Best, Hequn
>>
>> On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <happydexu...@gmail.com> wrote:
>>
>>> Hi Fabian,
>>> So maybe I can not join a table that generate from a window, because the
>>> table is getting larger and larger as the time goes, maybe the system will
>>> crash one day.
>>>
>>> I am working on a system that calculate the “score" of article, which is
>>> consist of the count of article praise, the count of article response, etc
>>> Because I can not use flink to save all the article, I decide to update
>>> the score of the article that created in 3 days.
>>>
>>> I have two choises,
>>> 1. join the article table and praise table, response table then window
>>> select a.article_id, count(p.praise_id) as pCount, count(r.response_id)
>>> as rCount
>>> from
>>> article a
>>> left join
>>> praise p on a.article_id = p.article_id
>>> left join
>>> response r on a.article_id = r.article_id
>>> group by hop(updated_time, interval '1' minute,interval '3' day) ,
>>> article_id
>>> 2. window the article table, window the priase table, window the
>>> response table ,then join them together
>>> select aAggr.article_id, pAggr.pCount, rAggr.rCount
>>> (select article_id from article group by hop(updated_time, interval '1'
>>> minute,interval '3' day) , article_id) aAggr
>>> left join
>>> (select article_id,count(praise_id) as pCount from praise group by hop(
>>> updated_time, interval '1' minute,interval '3' day) , article_id) pAggr
>>> on aAggr.article_id=pAggr.article_id
>>> left join
>>> (select article_id,count(response_id) as rCount from response group by
>>> hop(updated_time, interval '1' minute,interval '3' day) , article_id)
>>> rAggr on aAggr.article_id=rAggr.article_id
>>>
>>> Maybe I should choose 1,   join then window, but not window then join.
>>> Please correct me if I am wrong.
>>>
>>> I have some worries when choose 1,
>>> I do not know how Flink works internally, it seems that in the sql ,
>>> table article ,table praise, table response is growing as the time goes by,
>>> will it introduce performance issue?
>>>
>>> Best,
>>> Henry
>>>
>>> 在 2018年8月21日,下午9:29,Hequn Cheng <chenghe...@gmail.com> 写道:
>>>
>>> Hi Henry,
>>>
>>> praiseAggr is an append table, so it contains
>>> "100,101,102,100,101,103,100".
>>> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY
>>> article_id", the answer is "101,102,103"
>>> 2. if you change your sql to s"SELECT last_value(article_id) FROM
>>> praise", the answer is "100"
>>>
>>> Best, Hequn
>>>
>>> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <happydexu...@gmail.com> wrote:
>>>
>>>> Hi Fabian,
>>>> Thanks for your response. This question puzzles me for quite a long
>>>> time.
>>>> If the praiseAggr has the following value:
>>>> window-1     100,101,102
>>>> window-2     100,101,103
>>>> window-3     100
>>>>
>>>> the last time the article table joins praiseAggr, which of the
>>>> following value does praiseAggr table has?
>>>> 1— 100,101,102,100,101,103,100           collect all the element of
>>>> all the window
>>>> 2—  100    the element of the latest window
>>>> 3—  101,102,103    the distinct value of all the window
>>>>
>>>>
>>>> Best,
>>>> Henry
>>>>
>>>>
>>>> 在 2018年8月21日,下午8:02,Fabian Hueske <fhue...@gmail.com> 写道:
>>>>
>>>> Hi,
>>>>
>>>> The semantics of a query do not depend on the way that it is used.
>>>> praiseAggr is a table that grows by one row per second and article_id.
>>>> If you use that table in a join, the join will fully materialize the table.
>>>> This is a special case because the same row is added multiple times, so
>>>> the state won't grow that quickly, but the performance will decrease
>>>> because for each row from article will join with multiple (a growing
>>>> number) of rows from praiseAggr.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2018-08-21 12:19 GMT+02:00 徐涛 <happydexu...@gmail.com>:
>>>>
>>>>> Hi All,
>>>>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise
>>>>> GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) ,
>>>>> article_id" )
>>>>> tableEnv.registerTable("praiseAggr", praiseAggr)
>>>>>
>>>>>     var finalTable = tableEnv.sqlQuery(*s**”**SELECT 1 FROM article a 
>>>>> join praiseAggr p on a.article_id=p.article_id" *)
>>>>>     tableEnv.registerTable("finalTable", finalTable)
>>>>>
>>>>>  I know that praiseAggr, if written to sink, is append mode , so if a
>>>>> table joins praiseAggr, what the table “see”, is a table contains the
>>>>> latest value, or a table that grows larger and larger? If it is the later,
>>>>> will it introduce performance problem?
>>>>>  Thanks a lot.
>>>>>
>>>>>
>>>>> Best,
>>>>> Henry
>>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>
>

Reply via email to