[ https://issues.apache.org/jira/browse/FLINK-6081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15949857#comment-15949857 ]
radu commented on FLINK-6081: ----------------------------- [~fhueske] [~shijinkui] [~Yuhong_kyo] [~sunjincheng121] [~twalthr] [~stefano.bortoli] Trying as before to sum up the discussions so far: 1) We would implement the Sort for the case of simple appends and deliver this to the master. This is for the cases mainly when there is only a simple ordering (sorting). This is interesting for the cases when we would use this on top of the execution query to filter the number of outputs we want to emit. -> I will make a simple implementation for the simple LogicalSort for the master (to support the filtering<<pagination>> of the output results) 2) for the case when the orderby is accompanied by offset/fetch retraction is needed. Particular if we imagine examples such as emitting 2 results from the last hour, than as the stream would progress we would need to emit a new set of the renewed 2 events and a retraction for the one(s) that needs to canceled. Because of this the emission of the result would be accompanied with a retraction marker for the other operators. This would only apply in the case of inner queries. Hence this feature would be delivered initially in the flink branch that implements the retraction > Offset/Fetch support for SQL Streaming > -------------------------------------- > > Key: FLINK-6081 > URL: https://issues.apache.org/jira/browse/FLINK-6081 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: radu > Attachments: offset.png > > > Time target: Proc Time > The main scope of Offset/Fetch is for pagination support. In the context > of streaming Offset and Fetch would make sense within the scope of > certain window constructs as they refer to buffered data from the stream > (with a main usage to restrict the output that is shown at a certain > moment). Therefore they should be applied to the output of the types of > windows supported by the ORDER BY clauses. Moreover, in accordance to > the SQL best practices, they can only be used with an ORDER BY clause. > SQL targeted query examples: > ---------------------------- > Window defined based on group by clause > Q1: > {code} > SELECT a ORDER BY b OFFSET n ROWS FROM stream1 GROUP BY HOP(proctime, > INTERVAL '1' HOUR, INTERVAL '3' HOUR) > {code} > Window defined based on where clause time boundaries > Q2: > {code} > SELECT a ORDER BY b OFFSET n WHERE procTime() BETWEEN current_timestamp - > INTERVAL '1' HOUR AND current_timestamp FROM stream1 > {code} > ~~Window defined as sliding windows (aggregates) ~~ > Q3: > {code} > SELECT SUM(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING b > OFFSET n ROWS) FROM stream1 > {code} > Comment: Supporting offset over sliding windows (within the window) does > not make sense because the main scope of OFFSET/FETCH is for pagination > support. Therefore this functionality example should only be supported in > relation to the > output of a query. Hence, Q3 will not be supported > The general grammar (Calcite version) for OFFSET/FECTH with available > parameters is shown below: > {code} > Select […] > [ ORDER BY orderItem [, orderItem ]* ] > [ OFFSET start { ROW | ROWS } ] > [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ] > {code} > Description > ----------- > Offset and Fetch are primary used for pagination support (i.e., restrict > the output that is shown at some point). They were mainly designed to > support web page display of the contents. Building on this scenario we > can imagine a similar role for OFFSET and FETCH for streams that would > display contents via a web page. In such a scenario the number of > outputs to be displayed would be limited using such operators (probably > for pagination and aesthetic reasons). However, as for any stream > application there is a natural evolution in time, the operators output > should evolve with the update rate of the application. The fact that > there is an update rate and a collection of events related to a stream > points to window constructs. Therefore the OFFSET/FETCH functionality > would be related to the window mechanisms/boundaries defined by the > query. Hence when the window construct would be re-triggered the output > would be filtered again from the cardinality point of view based on the > logic of the OFFSET/FETCH. > Because of the primary reasons of supporting pagination (and controlling > the number of outputs) we limit the usage of OFFSET/Fetch for window > constructs that would be related to the output. Because of this > supporting those on sliding window with query aggregates (e.g., Q3 query > example) would not make sense. Additionally there is an implicit need > for some ordering clause due to the fact that OFFSET and FETCH point to > ordering positions. That is why these functions would be supported only > if an ORDER BY clause is present. > Functionality example > --------------------- > We exemplify the usage of OFFSET below using the following query. Event > schema is in the form (a,b). > {code} > SELECT a ORDER BY b OFFSET 2 ROWS FROM stream1 GROUP BY GROUP BY > CEIL(proctime TO HOUR) > {code} > ||Proctime|| IngestionTime(Event)|| Stream1|| Output|| > | |10:00:01| (a1, 7)| | > | |10:05:00| (c1, 2)| | > | |10:12:00| (b1,5)| | > | |10:50:00| (d1,2)| | > |10-11| | |b1,a1| > | |11:03:00| (a2,10)| | > |11-12| | |nil| > |...| > Implementation option > --------------------- > There are 2 options to implement the logic of OFFSET/Fetch: > 1) Within the logic of the window (i.e. sorting window) > Similar as for sorting support (ORDER BY clause), considering that the > SQL operators will be associated with window boundaries, the > functionality will be implemented within the logic of the window as > follows. We extract the window boundaries and window type from the query > logic. These will be used to define the type of the window, triggering > policy. The logic of the query (i.e., the sorting of the events) will in > turn be implemented within the window function. In addition to this, the > logic of for filtering the output based on the cardinality logic of > OFFSET/FETCH will be added. With this implementation the logic of the > OFFSET and FETCH is combined with the one of ORDER BY clause. As ORDER > BY is always required, it does not provide any implementation > restrictions. > 1) Within the logic of a filter/flatMap function with state counter for > outputs) > Instead of adding the logic within the window functions, the filtering > can be done within a standalone operator that only counts outputs and > emits the ones that fall within the logic of the OFFSET/FETCH. To > provide this functionality we need to use a flatMap function in which we > count the results. The OFFSET/FETCH condition would be transpose into > the condition of an IF, applied based on the order of the output, to > emit the output. However, the counter would need to be reset in > accordance to the triggering of the window, which makes the > implementation tedious. This is despite the fact that this > implementation option would directly translate the output filtering > logic of the operators from relational SQL. > We recommend option 1 for implementation. > Therefore for option 1 we reuse entirely the ORDER BY implementation and > just add: > 1) A counter for the indexing the outputs > 2) An if condition to emit the output only if the corresponding index > counter falls within the scope defined by the OFFSET/FETCH > !offset.png! > General logic of Join > --------------------- > inputDataStream.window(new \[Slide/Tumble\]\[Time/Count\]Window()) > > //.trigger(new \[Time/Count\]Trigger()) – use default > > > > //.evictor(new \[Time/Count\]Evictor()) – use default > .apply(SortAndCountFilter()); -- This message was sent by Atlassian JIRA (v6.3.15#6346)