[ https://issues.apache.org/jira/browse/FLINK-6011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956518#comment-15956518 ]
ASF GitHub Bot commented on FLINK-6011: --------------------------------------- Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3665#discussion_r109857875 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala --- @@ -117,46 +119,86 @@ class LogicalWindowAggregateRule } private def identifyWindow(field: RexNode): Option[Window] = { - // Detects window expressions by pattern matching - // supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx), - // with time being equal to proctime() or rowtime() field match { case call: RexCall => call.getOperator match { - case _: SqlFloorFunction => - val operand = call.getOperands.get(1).asInstanceOf[RexLiteral] - val unit: TimeUnitRange = operand.getValue.asInstanceOf[TimeUnitRange] - val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit) - call.getType match { - case TimeModeTypes.PROCTIME => - return Some(w) - case TimeModeTypes.ROWTIME => - return Some(w.on("rowtime")) - case _ => - } - case _ => + case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow + case SqlStdOperatorTable.TUMBLE => TumbleWindowTranslator(call).toWindow + case SqlStdOperatorTable.HOP => SlidingWindowTranslator(call).toWindow + case SqlStdOperatorTable.SESSION => SessionWindowTranslator(call).toWindow + case _ => None } - case _ => + case _ => None } - None } - } -object LogicalWindowAggregateRule { +private abstract class WindowTranslator { + val call: RexCall - private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate], - RelOptRule.operand(classOf[LogicalProject], RelOptRule.none())) + protected def unwrapLiteral[T](node: RexNode): T = + node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T] - private[flink] val INSTANCE = new LogicalWindowAggregateRule + protected def getOperandAsLong(idx: Int): Long = + unwrapLiteral[BigDecimal](call.getOperands.get(idx)).longValue() --- End diff -- Agree. 97d1a45 will throw an `TableException` if the configuration is not fixed. > Support TUMBLE, HOP, SESSION window in streaming SQL > ---------------------------------------------------- > > Key: FLINK-6011 > URL: https://issues.apache.org/jira/browse/FLINK-6011 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Haohui Mai > Assignee: Haohui Mai > > CALCITE-1603 and CALCITE-1615 introduces the support of the {{TUMBLE}} / > {{HOP}} / {{SESSION}} windows in the parser. > This jira tracks the efforts of adding the corresponding supports on the > planners / optimizers in Flink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)