[ 
https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15942809#comment-15942809
 ] 

ASF GitHub Bot commented on FLINK-5653:
---------------------------------------

Github user huawei-flink commented on the issue:

    https://github.com/apache/flink/pull/3574
  
    Hi @fhueske, @sunjincheng121 ,
    
    let me try to explain my perspective on this specific case (row based, proc 
time). This is for the purpose of discussion, to show that we are spending 
thoughts on this topic for a while now.
    
    In case of the row range, the "serialization savings" coming from MapState 
exists up to the point in which the "buffer" is filled. After that that, we 
need to start retracting to keep the value correct and to do that, we need to 
deserialize all the objects. as @rtudoran mentioned, we implemented a version 
using a Queue object.
    
    This has many advantages:
    - removing the object from the buffer at the right moment freeing memory on 
the go (without any iteration over the key set) 
    - has the data access pattern of O(1) without any "key resolution costs" 
and no list iteration
    - keeps the natural processing order by design, without the need of 
indexing objects with timestamps
    - the experiments we run show that there are no difference for windows up 
to 100k elements, and after that the queue seems to be more efficient (as the 
the key resolution does not come for free). 
    
    The map state may have a slight advantage in the early stages, when the 
window is not filled, but after it just introduces useless operations. 
Furthermore, the need to index objects with a created timestamp (more memory 
wasted), dealing with a sequential access (List) to get the most recent object 
when you can actually just use the natural arrival order seems useless 
complication. Applying the Occam Razor there should be no doubt on which 
solution we should be selecting first. The serialization optimization while 
window gets filled sounds like a premature optimization not worth in the long 
run. The further implementation of SQL operators (e.g. LIMIT, OFFSET etc)  can 
just benefit from the fact that the state is already sorted, whereas the map 
would need to be sorted all the time. 
    
    Of course I am talking specifically of the procTime semantic operations. 
eventTime is another story anyway. The map state as minor advantages in the 
beginning (as anyway the serialization costs are small), the queue state as 
advantages in executions running steadily because of access pattern and natural 
buffer cleansing.  
    
    These are my two cents on the discussion
    



> Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
> --------------------------------------------------------------------
>
>                 Key: FLINK-5653
>                 URL: https://issues.apache.org/jira/browse/FLINK-5653
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Fabian Hueske
>            Assignee: Stefano Bortoli
>
> The goal of this issue is to add support for OVER ROWS aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING 
> AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING 
> AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5656)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to