[ https://issues.apache.org/jira/browse/FLINK-6082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Fabian Hueske updated FLINK-6082: --------------------------------- Issue Type: New Feature (was: Sub-task) Parent: (was: FLINK-4557) > Support window definition for SQL Queries based on WHERE clause with time > condition > ----------------------------------------------------------------------------------- > > Key: FLINK-6082 > URL: https://issues.apache.org/jira/browse/FLINK-6082 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: radu > > Time target: Proc Time > Calcite documentation refers to query examples where the (time) > boundaries are defined as condition within the WHERE clause. As Flink > community targets compatibility with Calcite, it makes sense to support > the definition of windows via this method as well as corresponding > aggregation on top of them. > SQL targeted query examples: > ---------------------------- > ```SELECT productId, count(\*) FROM stream1 WHERE proctime BETWEEN current\_ > timestamp - INTERVAL '1' HOUR AND current\_timestamp``` > General comment: > 1) window boundaries are defined as conditions in WHERE clause. > 2) For indicating the usage of different stream times, rowtime and > proctime can be used > 3) The boundaries are defined based on special construct provided by > calcite: current\_timestamp and time operations > Description: > ------------ > The logic of this operator is strictly related to supporting aggregates > over sliding windows defined with OVER > ([FLINK-5653](https://issues.apache.org/jira/browse/FLINK-5653), > [FLINK-5654](https://issues.apache.org/jira/browse/FLINK-5654), > [FLINK-5655](https://issues.apache.org/jira/browse/FLINK-5655), > [FLINK-5658](https://issues.apache.org/jira/browse/FLINK-5658), > [FLINK-5656](https://issues.apache.org/jira/browse/FLINK-5656)). In this > issue the design considered queries where the window is defined with the > syntax of OVER clause and aggregates are applied over this period. This > is similar in behavior with the only exception that the window > boundaries are defined with respect to the WHERE conditions. Besides > this the logic and the types of aggregates to be supported should be the > same (sum, count, avg, min, max). Supporting these types of query is > related to the pie chart problem tackled by calcite. > Similar as for the OVER windows, the construct should build rolling > windows (i.e., windows that are triggered and move with every incoming > event). > Functionality example > --------------------- > We exemplify below the functionality of the IN/Exists when working with > streams. > `SELECT a, count( * ) FROM stream1 WHERE proctime BETWEEN current_ timestamp > - INTERVAL '1' HOUR AND current_timestamp;` > ||IngestionTime(Event)|| Stream1|| Output|| > |10:00:01 |Id1,10 |Id1,1| > |10:02:00 |Id2,2 |Id2,2| > |11:25:00 |Id3,2 |Id3,1| > |12:03:00 |Id4,15 |Id4,2| > |12:05:00 |Id5,11 |Id5,3| > |12:56:00 |Id6,20 |Id6,3| > |...| > Implementation option > --------------------- > Considering that the query follows the same functionality as for the > aggregates over window, the implementation should follow the same > implementation as for the OVER clause. Considering that the WHERE > condition are typically related to timing, this means that in case of > one unbound boundary the > [FLINK-5658](https://issues.apache.org/jira/browse/FLINK-5658) should be > used, while for bounded time windows the > [FLINK-5654](https://issues.apache.org/jira/browse/FLINK-5654) design > should be used. > The window boundaries will be extracted from the WHERE condition. > The rule will not be mapped anymore to a LogicalWindow, which means that > the conversion to this would need to happen from the current > DataStreamCalc rule. In this sense, a dedicated condition will be added > such that in case the WHERE clause has time conditions, the operator > implementation of the Over clause (used in the previous issues) should > be used. > ``` > class DataStreamCalcRule > > ----------------------------------------------------------------------------------------------- > { > --- > ------------------------------------------------------------------------------------------- > > def convert(rel: RelNode): RelNode = { > val calc: LogicalCalc = rel.asInstanceOf\[LogicalCalc\] > val traitSet: RelTraitSet = > rel.getTraitSet.replace(DataStreamConvention.INSTANCE) > val convInput: RelNode = RelOptRule.convert(calc.getInput, > DataStreamConvention.INSTANCE) > > IF(WHERE contains TIME limits) > > { > > > IF(bounded) > > > > new DataStreamProcTimeTimeAggregate > > > > ELSE > > > > new DataStreamSlideEventTimeRowAgg > > > > } > > > > Else > > **{** > > new DataStreamCalc( > rel.getCluster, > traitSet, > convInput, > rel.getRowType, > calc.getProgram, > description) > } > > } > } > > ----------------------------------------------------------------------------------------------- > ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)