[ https://issues.apache.org/jira/browse/FLINK-6011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956507#comment-15956507 ]
ASF GitHub Bot commented on FLINK-6011: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3665#discussion_r109856293 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala --- @@ -117,46 +119,86 @@ class LogicalWindowAggregateRule } private def identifyWindow(field: RexNode): Option[Window] = { - // Detects window expressions by pattern matching - // supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx), - // with time being equal to proctime() or rowtime() field match { case call: RexCall => call.getOperator match { - case _: SqlFloorFunction => - val operand = call.getOperands.get(1).asInstanceOf[RexLiteral] - val unit: TimeUnitRange = operand.getValue.asInstanceOf[TimeUnitRange] - val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit) - call.getType match { - case TimeModeTypes.PROCTIME => - return Some(w) - case TimeModeTypes.ROWTIME => - return Some(w.on("rowtime")) - case _ => - } - case _ => + case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow + case SqlStdOperatorTable.TUMBLE => TumbleWindowTranslator(call).toWindow + case SqlStdOperatorTable.HOP => SlidingWindowTranslator(call).toWindow + case SqlStdOperatorTable.SESSION => SessionWindowTranslator(call).toWindow + case _ => None } - case _ => + case _ => None } - None } - } -object LogicalWindowAggregateRule { +private abstract class WindowTranslator { + val call: RexCall - private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate], - RelOptRule.operand(classOf[LogicalProject], RelOptRule.none())) + protected def unwrapLiteral[T](node: RexNode): T = + node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T] - private[flink] val INSTANCE = new LogicalWindowAggregateRule + protected def getOperandAsLong(idx: Int): Long = + unwrapLiteral[BigDecimal](call.getOperands.get(idx)).longValue() --- End diff -- Flink does only support windows with fixed configuration (SESSION windows have variable length, but the gap parameter is fixed). I'm also not sure if that would make sense. It's quite hard to reason about the behavior of a window with variable parameters, IMO. > Support TUMBLE, HOP, SESSION window in streaming SQL > ---------------------------------------------------- > > Key: FLINK-6011 > URL: https://issues.apache.org/jira/browse/FLINK-6011 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Haohui Mai > Assignee: Haohui Mai > > CALCITE-1603 and CALCITE-1615 introduces the support of the {{TUMBLE}} / > {{HOP}} / {{SESSION}} windows in the parser. > This jira tracks the efforts of adding the corresponding supports on the > planners / optimizers in Flink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)