[ https://issues.apache.org/jira/browse/FLINK-6081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot reassigned FLINK-6081: ------------------------------------- Assignee: (was: radu) > Offset/Fetch support for SQL Streaming > -------------------------------------- > > Key: FLINK-6081 > URL: https://issues.apache.org/jira/browse/FLINK-6081 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API > Reporter: radu > Priority: Major > Labels: auto-unassigned > Attachments: offset.png > > > Time target: Proc Time > The main scope of Offset/Fetch is for pagination support. In the context > of streaming Offset and Fetch would make sense within the scope of > certain window constructs as they refer to buffered data from the stream > (with a main usage to restrict the output that is shown at a certain > moment). Therefore they should be applied to the output of the types of > windows supported by the ORDER BY clauses. Moreover, in accordance to > the SQL best practices, they can only be used with an ORDER BY clause. > SQL targeted query examples: > ---------------------------- > Window defined based on group by clause > Q1: > {code} > SELECT a ORDER BY b OFFSET n ROWS FROM stream1 GROUP BY HOP(proctime, > INTERVAL '1' HOUR, INTERVAL '3' HOUR) > {code} > Window defined based on where clause time boundaries > Q2: > {code} > SELECT a ORDER BY b OFFSET n WHERE procTime() BETWEEN current_timestamp - > INTERVAL '1' HOUR AND current_timestamp FROM stream1 > {code} > ~~Window defined as sliding windows (aggregates) ~~ > Q3: > {code} > SELECT SUM(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING b > OFFSET n ROWS) FROM stream1 > {code} > Comment: Supporting offset over sliding windows (within the window) does > not make sense because the main scope of OFFSET/FETCH is for pagination > support. Therefore this functionality example should only be supported in > relation to the > output of a query. Hence, Q3 will not be supported > The general grammar (Calcite version) for OFFSET/FECTH with available > parameters is shown below: > {code} > Select […] > [ ORDER BY orderItem [, orderItem ]* ] > [ OFFSET start { ROW | ROWS } ] > [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ] > {code} > Description > ----------- > Offset and Fetch are primary used for pagination support (i.e., restrict > the output that is shown at some point). They were mainly designed to > support web page display of the contents. Building on this scenario we > can imagine a similar role for OFFSET and FETCH for streams that would > display contents via a web page. In such a scenario the number of > outputs to be displayed would be limited using such operators (probably > for pagination and aesthetic reasons). However, as for any stream > application there is a natural evolution in time, the operators output > should evolve with the update rate of the application. The fact that > there is an update rate and a collection of events related to a stream > points to window constructs. Therefore the OFFSET/FETCH functionality > would be related to the window mechanisms/boundaries defined by the > query. Hence when the window construct would be re-triggered the output > would be filtered again from the cardinality point of view based on the > logic of the OFFSET/FETCH. > Because of the primary reasons of supporting pagination (and controlling > the number of outputs) we limit the usage of OFFSET/Fetch for window > constructs that would be related to the output. Because of this > supporting those on sliding window with query aggregates (e.g., Q3 query > example) would not make sense. Additionally there is an implicit need > for some ordering clause due to the fact that OFFSET and FETCH point to > ordering positions. That is why these functions would be supported only > if an ORDER BY clause is present. > Functionality example > --------------------- > We exemplify the usage of OFFSET below using the following query. Event > schema is in the form (a,b). > {code} > SELECT a ORDER BY b OFFSET 2 ROWS FROM stream1 GROUP BY GROUP BY > CEIL(proctime TO HOUR) > {code} > ||Proctime|| IngestionTime(Event)|| Stream1|| Output|| > | |10:00:01| (a1, 7)| | > | |10:05:00| (c1, 2)| | > | |10:12:00| (b1,5)| | > | |10:50:00| (d1,2)| | > |10-11| | |b1,a1| > | |11:03:00| (a2,10)| | > |11-12| | |nil| > |...| > Implementation option > --------------------- > There are 2 options to implement the logic of OFFSET/Fetch: > 1) Within the logic of the window (i.e. sorting window) > Similar as for sorting support (ORDER BY clause), considering that the > SQL operators will be associated with window boundaries, the > functionality will be implemented within the logic of the window as > follows. We extract the window boundaries and window type from the query > logic. These will be used to define the type of the window, triggering > policy. The logic of the query (i.e., the sorting of the events) will in > turn be implemented within the window function. In addition to this, the > logic of for filtering the output based on the cardinality logic of > OFFSET/FETCH will be added. With this implementation the logic of the > OFFSET and FETCH is combined with the one of ORDER BY clause. As ORDER > BY is always required, it does not provide any implementation > restrictions. > 1) Within the logic of a filter/flatMap function with state counter for > outputs) > Instead of adding the logic within the window functions, the filtering > can be done within a standalone operator that only counts outputs and > emits the ones that fall within the logic of the OFFSET/FETCH. To > provide this functionality we need to use a flatMap function in which we > count the results. The OFFSET/FETCH condition would be transpose into > the condition of an IF, applied based on the order of the output, to > emit the output. However, the counter would need to be reset in > accordance to the triggering of the window, which makes the > implementation tedious. This is despite the fact that this > implementation option would directly translate the output filtering > logic of the operators from relational SQL. > We recommend option 1 for implementation. > Therefore for option 1 we reuse entirely the ORDER BY implementation and > just add: > 1) A counter for the indexing the outputs > 2) An if condition to emit the output only if the corresponding index > counter falls within the scope defined by the OFFSET/FETCH > !offset.png! > General logic of Join > --------------------- > inputDataStream.window(new \[Slide/Tumble\]\[Time/Count\]Window()) > > //.trigger(new \[Time/Count\]Trigger()) – use default > > > > //.evictor(new \[Time/Count\]Evictor()) – use default > .apply(SortAndCountFilter()); -- This message was sent by Atlassian Jira (v8.3.4#803005)