[ https://issues.apache.org/jira/browse/FLINK-6584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16067946#comment-16067946 ]
ASF GitHub Bot commented on FLINK-6584: --------------------------------------- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4199#discussion_r124725025 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala --- @@ -54,34 +56,75 @@ class WindowStartEndPropertiesRule val project = call.rel(0).asInstanceOf[LogicalProject] val innerProject = call.rel(1).asInstanceOf[LogicalProject] val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate] + val window = agg.getWindow - // Retrieve window start and end properties + val isRowtime = isRowtimeAttribute(window.timeAttribute) + val isProctime = isProctimeAttribute(window.timeAttribute) + + val startEndProperties = Seq( + NamedWindowProperty("w$start", WindowStart(window.aliasAttribute)), + NamedWindowProperty("w$end", WindowEnd(window.aliasAttribute))) + + // allow rowtime/proctime for rowtime windows and proctime for proctime windows + val timeProperties = if (isRowtime) { + Seq( + NamedWindowProperty("w$rowtime", RowtimeAttribute(window.aliasAttribute)), + NamedWindowProperty("w$proctime", ProctimeAttribute(window.aliasAttribute))) + } else if (isProctime) { + Seq(NamedWindowProperty("w$proctime", ProctimeAttribute(window.aliasAttribute))) + } else { + Seq() + } + + val properties = startEndProperties ++ timeProperties + + // retrieve window start and end properties val transformed = call.builder() val rexBuilder = transformed.getRexBuilder transformed.push(LogicalWindowAggregate.create( - agg.getWindow, - Seq( - NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)), - NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute)) - ), agg) + window, + properties, + agg) ) // forward window start and end properties transformed.project( - innerProject.getProjects ++ Seq(transformed.field("w$start"), transformed.field("w$end"))) + innerProject.getProjects ++ properties.map(np => transformed.field(np.name))) def replaceGroupAuxiliaries(node: RexNode): RexNode = { node match { case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) => // replace expression by access to window start rexBuilder.makeCast(c.getType, transformed.field("w$start"), false) + case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) => // replace expression by access to window end rexBuilder.makeCast(c.getType, transformed.field("w$end"), false) + + case c: RexCall if WindowStartEndPropertiesRule.isWindowRowtime(c) => + if (isProctime) { + throw ValidationException("A proctime window cannot provide a rowtime attribute.") + } else if (isRowtime) { + // replace expression by access to window rowtime + transformed.field("w$rowtime") + } else { + throw TableException("Accessing the rowtime attribute of a window is not yet " + + "supported in a batch environment.") + } + + case c: RexCall if WindowStartEndPropertiesRule.isWindowProctime(c) => + if (isProctime) { --- End diff -- add a `isRowtime` condition branch? > Support multiple consecutive windows in SQL > ------------------------------------------- > > Key: FLINK-6584 > URL: https://issues.apache.org/jira/browse/FLINK-6584 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Timo Walther > Assignee: Timo Walther > > Right now, the Table API supports multiple consecutive windows as follows: > {code} > val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, > 'bigdec, 'string) > val t = table > .window(Tumble over 2.millis on 'rowtime as 'w) > .groupBy('w) > .select('w.rowtime as 'rowtime, 'int.count as 'int) > .window(Tumble over 4.millis on 'rowtime as 'w2) > .groupBy('w2) > .select('w2.rowtime, 'w2.end, 'int.count) > {code} > Similar behavior should be supported by the SQL API as well. We need to > introduce a new auxiliary group function, but this should happen in sync with > Apache Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)