[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15977681#comment-15977681 ]
ASF GitHub Bot commented on FLINK-6228: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112570095 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAlias agg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( + agg: Aggregation, + var aggAlias: Expression, + overWindowAlias: Expression, + var overWindow: OverWindow = null) extends Expression { + + private[flink] def as(aggAlias: Expression): OverCall = { + this.aggAlias = aggAlias + this + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + + val rexBuilder = relBuilder.getRexBuilder + + val operator: SqlAggFunction = agg.toSqlAggFunction() + + val aggReturnType: TypeInformation[_] = agg.resultType + + val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + + val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() + val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name + + aggExprs.add(relBuilder.field(aggChildName)) + + val orderKeys: ImmutableList.Builder[RexFieldCollation] = + new ImmutableList.Builder[RexFieldCollation]() + + val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]() + val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name + + val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { + // for stream + relBuilder.literal(orderName) + } else { + // for batch + relBuilder.field(orderName) + } + + orderKeys.add(new RexFieldCollation(rexNode,sets)) + + val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() + overWindow.partitionBy.foreach(x=> + partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name))) + + val preceding = overWindow.preceding.asInstanceOf[Literal] + val following = overWindow.following.asInstanceOf[Literal] + + val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo] + + val lowerBound = createBound(relBuilder, preceding.value.asInstanceOf[Long], SqlKind.PRECEDING) + val upperBound = createBound(relBuilder, following.value.asInstanceOf[Long], SqlKind.FOLLOWING) + + rexBuilder.makeOver( + relDataType, + operator, + aggExprs, + partitionKeys, + orderKeys.build, + lowerBound, + upperBound, + isPhysical, + true, + false) + } + + private def createBound( + relBuilder: RelBuilder, + precedingValue: Long, + sqlKind: SqlKind): RexWindowBound = { + + if (precedingValue == Long.MaxValue) { + val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO) + create(unbounded, null) + } else if (precedingValue == 0L) { + val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO) + create(currentRow, null) + } else { + + val returnType = new BasicSqlType( + relBuilder.getTypeFactory.getTypeSystem, + SqlTypeName.DECIMAL) + + val sqlOperator = new SqlPostfixOperator( + sqlKind.name, + sqlKind, + 2, + new OrdinalReturnTypeInference(0), + null, + null) + + val operands: Array[SqlNode] = new Array[SqlNode](1) + operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO)) + + val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO) + + val expressions: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() + expressions.add(relBuilder.literal(precedingValue)) + + val rexNode = relBuilder.getRexBuilder.makeCall(returnType, sqlOperator, expressions) + + create(node, rexNode) + } + } + + override private[flink] def children: Seq[Expression] = Seq() --- End diff -- maybe we can check the partitionBy expressions here as well? > Integrating the OVER windows in the Table API > --------------------------------------------- > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: sunjincheng > Assignee: sunjincheng > > Syntax: > {code} > table > .overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) > ) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)