[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15977682#comment-15977682 ]
ASF GitHub Bot commented on FLINK-6228: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112571486 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala --- @@ -22,6 +22,126 @@ import org.apache.flink.table.expressions.{Expression, ExpressionParser} import org.apache.flink.table.plan.logical._ /** + * An over window specification. + * + * Over window is similar to the traditional OVER SQL. + */ +class OverWindow { + + private[flink] var alias: Expression = _ + private[flink] var partitionBy: Seq[Expression] = Seq[Expression]() + private[flink] var orderBy: Expression = _ + private[flink] var preceding: Expression = _ + private[flink] var following: Expression = null + + /** + * Assigns an alias for this window that the following `select()` clause can refer to. + * + * @param alias alias for this over window + * @return this over window + */ + def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias)) + + /** + * Assigns an alias for this window that the following `select()` clause can refer to. + * + * @param alias alias for this over window + * @return this over window + */ + def as(alias: Expression): OverWindow = { + this.alias = alias + this + } + + /** + * Partitions the elements on some partition keys. + * + * @param partitionBy + * @return this over window + */ + def partitionBy(partitionBy: String): OverWindow = { + this.partitionBy(ExpressionParser.parseExpression(partitionBy)) + } + + /** + * Partitions the elements on some partition keys. + * + * @param partitionBy + * @return this over window + */ + def partitionBy(partitionBy: Expression*): OverWindow = { + this.partitionBy = partitionBy + this + } + + + /** + * Specifies the time mode. + * + * @param orderBy For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] + * to specify time mode. + * @return this over window + */ + def orderBy(orderBy: String): OverWindow = { + this.orderBy(ExpressionParser.parseExpression(orderBy)) + } + + /** + * Specifies the time mode. + * + * @param orderBy For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] + * to specify time mode. + * @return this over window + */ + def orderBy(orderBy: Expression): OverWindow = { + this.orderBy = orderBy + this + } + + /** + * Set the preceding offset (based on time or row-count intervals) for over window + * + * @param preceding forward offset that relative to the current row + * @return this over window + */ + def preceding(preceding: String): OverWindow = { + this.preceding(ExpressionParser.parseExpression(preceding)) + } + + /** + * Set the preceding offset (based on time or row-count intervals) for over window + * + * @param preceding forward offset that relative to the current row + * @return this over window + */ + def preceding(preceding: Expression): OverWindow = { + this.preceding = preceding + this + } + + /** + * Set the following offset (based on time or row-count intervals) for over window + * + * @param following subsequent offset that relative to the current row + * @return this over window + */ + def following(following: String): OverWindow = { --- End diff -- Should we make `following` optional and default to CURRENT_ROW / CURRENT_RANGE depending on the type of `preceding`? I think that would be a nice shortcut and also be aligned with SQL. What do you think @sunjincheng121 ? > Integrating the OVER windows in the Table API > --------------------------------------------- > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: sunjincheng > Assignee: sunjincheng > > Syntax: > {code} > table > .overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) > ) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)