[ https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15942809#comment-15942809 ]
ASF GitHub Bot commented on FLINK-5653: --------------------------------------- Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3574 Hi @fhueske, @sunjincheng121 , let me try to explain my perspective on this specific case (row based, proc time). This is for the purpose of discussion, to show that we are spending thoughts on this topic for a while now. In case of the row range, the "serialization savings" coming from MapState exists up to the point in which the "buffer" is filled. After that that, we need to start retracting to keep the value correct and to do that, we need to deserialize all the objects. as @rtudoran mentioned, we implemented a version using a Queue object. This has many advantages: - removing the object from the buffer at the right moment freeing memory on the go (without any iteration over the key set) - has the data access pattern of O(1) without any "key resolution costs" and no list iteration - keeps the natural processing order by design, without the need of indexing objects with timestamps - the experiments we run show that there are no difference for windows up to 100k elements, and after that the queue seems to be more efficient (as the the key resolution does not come for free). The map state may have a slight advantage in the early stages, when the window is not filled, but after it just introduces useless operations. Furthermore, the need to index objects with a created timestamp (more memory wasted), dealing with a sequential access (List) to get the most recent object when you can actually just use the natural arrival order seems useless complication. Applying the Occam Razor there should be no doubt on which solution we should be selecting first. The serialization optimization while window gets filled sounds like a premature optimization not worth in the long run. The further implementation of SQL operators (e.g. LIMIT, OFFSET etc) can just benefit from the fact that the state is already sorted, whereas the map would need to be sorted all the time. Of course I am talking specifically of the procTime semantic operations. eventTime is another story anyway. The map state as minor advantages in the beginning (as anyway the serialization costs are small), the queue state as advantages in executions running steadily because of access pattern and natural buffer cleansing. These are my two cents on the discussion > Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL > -------------------------------------------------------------------- > > Key: FLINK-5653 > URL: https://issues.apache.org/jira/browse/FLINK-5653 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Fabian Hueske > Assignee: Stefano Bortoli > > The goal of this issue is to add support for OVER ROWS aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING > AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING > AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5656) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)