[ 
https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15548960#comment-15548960
 ] 

ASF GitHub Bot commented on FLINK-4691:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2562#discussion_r81847736
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
    @@ -675,11 +692,77 @@ class GroupedTable(
         * Example:
         *
         * {{{
    -    *   tab.groupBy("key").select("key, value.avg + " The average" as 
average")
    +    *   tab.groupBy("key").select("key, value.avg + ' The average' as 
average")
    +    * }}}
    +    */
    +  def select(fields: String): Table = {
    +    val fieldExprs = ExpressionParser.parseExpressionList(fields)
    +    select(fieldExprs: _*)
    +  }
    +
    +  /**
    +    * Windows a table to divide a (potentially) infinite stream of records 
into finite slices
    +    * based on the timestamps of elements or other criteria. This division 
is required when
    +    * working with infinite data and performing transformations that 
aggregate elements.
    +    *
    +    * @param groupWindow group-window specification required to bound the 
infinite input stream
    +    *                    into a finite group
    +    * @return group-windowed table
    +    */
    +  def window(groupWindow: GroupWindow): GroupWindowedTable = {
    +    if (table.tableEnv.isInstanceOf[BatchTableEnvironment]) {
    +      throw new ValidationException(s"Windows on batch tables are 
currently not supported.")
    +    }
    +    new GroupWindowedTable(table, groupKey, groupWindow)
    +  }
    +}
    +
    +class GroupWindowedTable(
    +    private[flink] val table: Table,
    +    private[flink] val groupKey: Seq[Expression],
    +    private[flink] val window: GroupWindow) {
    +
    +  /**
    +    * Performs a selection operation on a group-windowed table. Similar to 
an SQL SELECT statement.
    +    * The field expressions can contain complex expressions and 
aggregations.
    +    *
    +    * Example:
    +    *
    +    * {{{
    +    *   groupWindowTable.select('key, 'window.start, 'value.avg + " The 
average" as 'average)
    +    * }}}
    +    */
    +  def select(fields: Expression*): Table = {
    +    val projectionOnAggregates = fields.map(extractAggregations(_, 
table.tableEnv))
    +    val aggregations = projectionOnAggregates.flatMap(_._2)
    +
    +    val groupWindow = window.toLogicalWindow
    +
    +    val logical = if (aggregations.nonEmpty) {
    +      Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)),
    +        WindowAggregate(groupKey, groupWindow, aggregations, 
table.logicalPlan)
    +          .validate(table.tableEnv))
    +    } else {
    +      Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)),
    +        WindowAggregate(groupKey, groupWindow, Nil, 
table.logicalPlan).validate(table.tableEnv))
    +    }
    +
    +    new Table(table.tableEnv, logical.validate(table.tableEnv))
    --- End diff --
    
    Wasn't `logical` validated before?


> Add group-windows for streaming tables        
> ---------------------------------------
>
>                 Key: FLINK-4691
>                 URL: https://issues.apache.org/jira/browse/FLINK-4691
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Timo Walther
>            Assignee: Timo Walther
>
> Add Tumble, Slide, Session group-windows for streaming tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  
> Implementation of group-windows on streaming tables. This includes 
> implementing the API of group-windows, the logical validation for 
> group-windows, and the definition of the “rowtime” and “systemtime” keywords. 
> Group-windows on batch tables won’t be initially supported and will throw an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to