[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15977676#comment-15977676 ]
ASF GitHub Bot commented on FLINK-6228: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112499806 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -93,28 +96,43 @@ class DataStreamOverAggregate( val orderKeys = overWindow.orderKeys.getFieldCollations - if (orderKeys.size() != 1) { - throw new TableException( - "Unsupported use of OVER windows. The window can only be ordered by a single time column.") - } - val orderKey = orderKeys.get(0) + val timeType = if (!orderKeys.isEmpty) { + if (orderKeys.size() != 1) { + throw new TableException( + "Unsupported use of OVER windows. The window can only be ordered by a single time " + + "column.") + } + val orderKey = orderKeys.get(0) - if (!orderKey.direction.equals(ASCENDING)) { - throw new TableException( - "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.") + if (!orderKey.direction.equals(ASCENDING)) { + throw new TableException( + "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.") + } + inputType + .getFieldList + .get(orderKey.getFieldIndex) + .getValue.asInstanceOf[TimeModeType] + } else { + val it = logicWindow.constants.listIterator() + if (it.hasNext) { + val item = it.next().getValue + if (item.isInstanceOf[NlsString]) { + val value = item.asInstanceOf[NlsString].getValue + if (value.equalsIgnoreCase("rowtime")) { + new RowTimeType + } else { + new ProcTimeType + } + } + } } val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) val generator = new CodeGenerator( - tableEnv.getConfig, - false, - inputDS.getType) - - val timeType = inputType - .getFieldList - .get(orderKey.getFieldIndex) - .getValue + tableEnv.getConfig, --- End diff -- indent > Integrating the OVER windows in the Table API > --------------------------------------------- > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: sunjincheng > Assignee: sunjincheng > > Syntax: > {code} > table > .overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) > ) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)