[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15977672#comment-15977672 ]
ASF GitHub Bot commented on FLINK-6228: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112559640 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAlias agg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( + agg: Aggregation, + var aggAlias: Expression, + overWindowAlias: Expression, + var overWindow: OverWindow = null) extends Expression { + + private[flink] def as(aggAlias: Expression): OverCall = { + this.aggAlias = aggAlias + this + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + + val rexBuilder = relBuilder.getRexBuilder + + val operator: SqlAggFunction = agg.toSqlAggFunction() + + val aggReturnType: TypeInformation[_] = agg.resultType + + val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + + val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() + val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name + + aggExprs.add(relBuilder.field(aggChildName)) + + val orderKeys: ImmutableList.Builder[RexFieldCollation] = + new ImmutableList.Builder[RexFieldCollation]() + + val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]() + val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name + + val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { + // for stream + relBuilder.literal(orderName) --- End diff -- can we use `relBuilder.call(EventTimeExtractor)` and `relBuilder.call(ProcTimeExtractor)` here to make the logic identical to SQL? > Integrating the OVER windows in the Table API > --------------------------------------------- > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: sunjincheng > Assignee: sunjincheng > > Syntax: > {code} > table > .overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) > ) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)