KurtYoung commented on a change in pull request #8244: [FLINK-11945] [table-runtime-blink] Support over aggregation for blink streaming runtime URL: https://github.com/apache/flink/pull/8244#discussion_r283600244
########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecOverAggregate.scala ########## @@ -134,6 +144,325 @@ class StreamExecOverAggregate( override protected def translateToPlanInternal( tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = { - throw new TableException("Implements this") + val tableConfig = tableEnv.getConfig + + if (logicWindow.groups.size > 1) { + throw new TableException( + "All aggregates must be computed on the same window.") + } + + val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0) + + val orderKeys = overWindow.orderKeys.getFieldCollations + + if (orderKeys.size() != 1) { + throw new TableException( + "The window can only be ordered by a single time column.") + } + val orderKey = orderKeys.get(0) + + if (!orderKey.direction.equals(ASCENDING)) { + throw new TableException( + "The window can only be ordered in ASCENDING mode.") + } + + val inputDS = getInputNodes.get(0).translateToPlan(tableEnv) + .asInstanceOf[StreamTransformation[BaseRow]] + + val inputIsAccRetract = StreamExecRetractionRules.isAccRetract(input) + + if (inputIsAccRetract) { + throw new TableException( + "Retraction on Over window aggregation is not supported yet. " + + "Note: Over window aggregation should not follow a non-windowed GroupBy aggregation.") + } + + if (!logicWindow.groups.get(0).keys.isEmpty && tableConfig.getMinIdleStateRetentionTime < 0) { + LOG.warn( + "No state retention interval configured for a query which accumulates state. " + + "Please provide a query configuration with valid retention interval to prevent " + + "excessive state size. You may specify a retention time of 0 to not clean up the state.") + } + + val timeType = outputRowType.getFieldList.get(orderKey.getFieldIndex).getType + + // check time field + if (!FlinkTypeFactory.isRowtimeIndicatorType(timeType) + && !FlinkTypeFactory.isProctimeIndicatorType(timeType)) { + throw new TableException( + "OVER windows' ordering in stream mode must be defined on a time attribute.") + } + + // identify window rowtime attribute + val rowTimeIdx: Option[Int] = if (FlinkTypeFactory.isRowtimeIndicatorType(timeType)) { + Some(orderKey.getFieldIndex) + } else if (FlinkTypeFactory.isProctimeIndicatorType(timeType)) { + None + } else { + throw new TableException( + "OVER windows can only be applied on time attributes.") + } + + val config = tableEnv.getConfig Review comment: already got table config in line 147 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services