KurtYoung commented on a change in pull request #8244: [FLINK-11945] 
[table-runtime-blink] Support over aggregation for blink streaming runtime
URL: https://github.com/apache/flink/pull/8244#discussion_r283600244
 
 

 ##########
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecOverAggregate.scala
 ##########
 @@ -134,6 +144,325 @@ class StreamExecOverAggregate(
 
   override protected def translateToPlanInternal(
       tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = {
-    throw new TableException("Implements this")
+    val tableConfig = tableEnv.getConfig
+
+    if (logicWindow.groups.size > 1) {
+      throw new TableException(
+          "All aggregates must be computed on the same window.")
+    }
+
+    val overWindow: org.apache.calcite.rel.core.Window.Group = 
logicWindow.groups.get(0)
+
+    val orderKeys = overWindow.orderKeys.getFieldCollations
+
+    if (orderKeys.size() != 1) {
+      throw new TableException(
+          "The window can only be ordered by a single time column.")
+    }
+    val orderKey = orderKeys.get(0)
+
+    if (!orderKey.direction.equals(ASCENDING)) {
+      throw new TableException(
+          "The window can only be ordered in ASCENDING mode.")
+    }
+
+    val inputDS = getInputNodes.get(0).translateToPlan(tableEnv)
+      .asInstanceOf[StreamTransformation[BaseRow]]
+
+    val inputIsAccRetract = StreamExecRetractionRules.isAccRetract(input)
+
+    if (inputIsAccRetract) {
+      throw new TableException(
+          "Retraction on Over window aggregation is not supported yet. " +
+            "Note: Over window aggregation should not follow a non-windowed 
GroupBy aggregation.")
+    }
+
+    if (!logicWindow.groups.get(0).keys.isEmpty && 
tableConfig.getMinIdleStateRetentionTime < 0) {
+      LOG.warn(
+        "No state retention interval configured for a query which accumulates 
state. " +
+          "Please provide a query configuration with valid retention interval 
to prevent " +
+          "excessive state size. You may specify a retention time of 0 to not 
clean up the state.")
+    }
+
+    val timeType = 
outputRowType.getFieldList.get(orderKey.getFieldIndex).getType
+
+    // check time field
+    if (!FlinkTypeFactory.isRowtimeIndicatorType(timeType)
+      && !FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
+      throw new TableException(
+        "OVER windows' ordering in stream mode must be defined on a time 
attribute.")
+    }
+
+    // identify window rowtime attribute
+    val rowTimeIdx: Option[Int] = if 
(FlinkTypeFactory.isRowtimeIndicatorType(timeType)) {
+      Some(orderKey.getFieldIndex)
+    } else if (FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
+      None
+    } else {
+      throw new TableException(
+          "OVER windows can only be applied on time attributes.")
+    }
+
+    val config = tableEnv.getConfig
 
 Review comment:
   already got table config in line 147

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to