[ https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15548960#comment-15548960 ]
ASF GitHub Bot commented on FLINK-4691: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81847736 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala --- @@ -675,11 +692,77 @@ class GroupedTable( * Example: * * {{{ - * tab.groupBy("key").select("key, value.avg + " The average" as average") + * tab.groupBy("key").select("key, value.avg + ' The average' as average") + * }}} + */ + def select(fields: String): Table = { + val fieldExprs = ExpressionParser.parseExpressionList(fields) + select(fieldExprs: _*) + } + + /** + * Windows a table to divide a (potentially) infinite stream of records into finite slices + * based on the timestamps of elements or other criteria. This division is required when + * working with infinite data and performing transformations that aggregate elements. + * + * @param groupWindow group-window specification required to bound the infinite input stream + * into a finite group + * @return group-windowed table + */ + def window(groupWindow: GroupWindow): GroupWindowedTable = { + if (table.tableEnv.isInstanceOf[BatchTableEnvironment]) { + throw new ValidationException(s"Windows on batch tables are currently not supported.") + } + new GroupWindowedTable(table, groupKey, groupWindow) + } +} + +class GroupWindowedTable( + private[flink] val table: Table, + private[flink] val groupKey: Seq[Expression], + private[flink] val window: GroupWindow) { + + /** + * Performs a selection operation on a group-windowed table. Similar to an SQL SELECT statement. + * The field expressions can contain complex expressions and aggregations. + * + * Example: + * + * {{{ + * groupWindowTable.select('key, 'window.start, 'value.avg + " The average" as 'average) + * }}} + */ + def select(fields: Expression*): Table = { + val projectionOnAggregates = fields.map(extractAggregations(_, table.tableEnv)) + val aggregations = projectionOnAggregates.flatMap(_._2) + + val groupWindow = window.toLogicalWindow + + val logical = if (aggregations.nonEmpty) { + Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)), + WindowAggregate(groupKey, groupWindow, aggregations, table.logicalPlan) + .validate(table.tableEnv)) + } else { + Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)), + WindowAggregate(groupKey, groupWindow, Nil, table.logicalPlan).validate(table.tableEnv)) + } + + new Table(table.tableEnv, logical.validate(table.tableEnv)) --- End diff -- Wasn't `logical` validated before? > Add group-windows for streaming tables > --------------------------------------- > > Key: FLINK-4691 > URL: https://issues.apache.org/jira/browse/FLINK-4691 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Timo Walther > Assignee: Timo Walther > > Add Tumble, Slide, Session group-windows for streaming tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > > Implementation of group-windows on streaming tables. This includes > implementing the API of group-windows, the logical validation for > group-windows, and the definition of the “rowtime” and “systemtime” keywords. > Group-windows on batch tables won’t be initially supported and will throw an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)