[ https://issues.apache.org/jira/browse/FLINK-6081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946776#comment-15946776 ]
radu commented on FLINK-6081: ----------------------------- Considering that Calcite does not currently support syntax such as GROUP BY HOP/TUMBLIN/... to define the time, the proposal is to first consider first only the queries in which the order by clause contains the time grouping. Example of query to be considered is: SELECT a2 FROM s2 ORDER BY CEIL(proctime() TO hour) ASC + the corersponding attributes (offset, fetch,...) which translates to LogicalSort(sort0=[$1], dir0=[ASC]) LogicalProject(a2=[$0], EXPR$1=[1970-01-01 00:00:00]) LogicalTableScan(table=[[s2]]) the alternative is to consider moving the time specification in a group by clause such as SELECT a2 FROM s2 GROUP BY CEIL(proctime() TO HOUR),a2 ORDER BY CEIL(proctime() to hour) OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY which leads to LogicalSort(sort0=[$1], dir0=[ASC], offset=[2], fetch=[2]) LogicalAggregate(group=[{1}]) LogicalProject(a2=[$0], EXPR$1=[1970-01-01 00:00:00]) LogicalTableScan(table=[[s2]]) ...this can be optimized such that the logicalsort is combined with the logical aggregate I would propose to focus on option one (without group by) [~fhueske] - what do you think? > Offset/Fetch support for SQL Streaming > -------------------------------------- > > Key: FLINK-6081 > URL: https://issues.apache.org/jira/browse/FLINK-6081 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Reporter: radu > Attachments: offset.png > > > Time target: Proc Time > The main scope of Offset/Fetch is for pagination support. In the context > of streaming Offset and Fetch would make sense within the scope of > certain window constructs as they refer to buffered data from the stream > (with a main usage to restrict the output that is shown at a certain > moment). Therefore they should be applied to the output of the types of > windows supported by the ORDER BY clauses. Moreover, in accordance to > the SQL best practices, they can only be used with an ORDER BY clause. > SQL targeted query examples: > ---------------------------- > Window defined based on group by clause > ```Q1: SELECT a ORDER BY b OFFSET n ROWS FROM stream1 GROUP BY HOP(proctime, > INTERVAL '1' HOUR, INTERVAL '3' HOUR) ``` > Window defined based on where clause time boundaries > ```Q2: SELECT a ORDER BY b OFFSET n WHERE procTime() BETWEEN > current\_timestamp - INTERVAL '1' HOUR AND current\_timestamp FROM stream1 ``` > ~~Window defined as sliding windows (aggregates) ~~ > ``` Q3: ~~SELECT SUM(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR > PRECEDING b OFFSET n ROWS) FROM stream1~~ ``` > Comment: Supporting offset over sliding windows (within the window) does > not make sense because the main scope of OFFSET/FETCH is for pagination > support. Therefore this functionality example should only be supported in > relation to the > output of a query. Hence, Q3 will not be supported > The general grammar (Calcite version) for OFFSET/FECTH with available > parameters is shown below: > ``` > Select […] > [ ORDER BY orderItem [, orderItem ]* ] > [ OFFSET start { ROW | ROWS } ] > [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ] > ``` > Description > ----------- > Offset and Fetch are primary used for pagination support (i.e., restrict > the output that is shown at some point). They were mainly designed to > support web page display of the contents. Building on this scenario we > can imagine a similar role for OFFSET and FETCH for streams that would > display contents via a web page. In such a scenario the number of > outputs to be displayed would be limited using such operators (probably > for pagination and aesthetic reasons). However, as for any stream > application there is a natural evolution in time, the operators output > should evolve with the update rate of the application. The fact that > there is an update rate and a collection of events related to a stream > points to window constructs. Therefore the OFFSET/FETCH functionality > would be related to the window mechanisms/boundaries defined by the > query. Hence when the window construct would be re-triggered the output > would be filtered again from the cardinality point of view based on the > logic of the OFFSET/FETCH. > Because of the primary reasons of supporting pagination (and controlling > the number of outputs) we limit the usage of OFFSET/Fetch for window > constructs that would be related to the output. Because of this > supporting those on sliding window with query aggregates (e.g., Q3 query > example) would not make sense. Additionally there is an implicit need > for some ordering clause due to the fact that OFFSET and FETCH point to > ordering positions. That is why these functions would be supported only > if an ORDER BY clause is present. > Functionality example > --------------------- > We exemplify the usage of OFFSET below using the following query. Event > schema is in the form (a,b). > ``` SELECT a ORDER BY b OFFSET 2 ROWS FROM stream1 GROUP BY GROUP BY > CEIL(proctime TO HOUR) ``` > ||Proctime|| IngestionTime(Event)|| Stream1|| Output|| > | |10:00:01| (a1, 7)| | > | |10:05:00| (c1, 2)| | > | |10:12:00| (b1,5)| | > | |10:50:00| (d1,2)| | > |10-11| | |b1,a1| > | |11:03:00| (a2,10)| | > |11-12| | |nil| > |...| > Implementation option > --------------------- > There are 2 options to implement the logic of OFFSET/Fetch: > 1) Within the logic of the window (i.e. sorting window) > Similar as for sorting support (ORDER BY clause), considering that the > SQL operators will be associated with window boundaries, the > functionality will be implemented within the logic of the window as > follows. We extract the window boundaries and window type from the query > logic. These will be used to define the type of the window, triggering > policy. The logic of the query (i.e., the sorting of the events) will in > turn be implemented within the window function. In addition to this, the > logic of for filtering the output based on the cardinality logic of > OFFSET/FETCH will be added. With this implementation the logic of the > OFFSET and FETCH is combined with the one of ORDER BY clause. As ORDER > BY is always required, it does not provide any implementation > restrictions. > 1) Within the logic of a filter/flatMap function with state counter for > outputs) > Instead of adding the logic within the window functions, the filtering > can be done within a standalone operator that only counts outputs and > emits the ones that fall within the logic of the OFFSET/FETCH. To > provide this functionality we need to use a flatMap function in which we > count the results. The OFFSET/FETCH condition would be transpose into > the condition of an IF, applied based on the order of the output, to > emit the output. However, the counter would need to be reset in > accordance to the triggering of the window, which makes the > implementation tedious. This is despite the fact that this > implementation option would directly translate the output filtering > logic of the operators from relational SQL. > We recommend option 1 for implementation. > Therefore for option 1 we reuse entirely the ORDER BY implementation and > just add: > 1) A counter for the indexing the outputs > 2) An if condition to emit the output only if the corresponding index > counter falls within the scope defined by the OFFSET/FETCH > !offset.png! > General logic of Join > --------------------- > inputDataStream.window(new \[Slide/Tumble\]\[Time/Count\]Window()) > > //.trigger(new \[Time/Count\]Trigger()) – use default > > > > //.evictor(new \[Time/Count\]Evictor()) – use default > .apply(SortAndCountFilter()); -- This message was sent by Atlassian JIRA (v6.3.15#6346)