Github user haohui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3665#discussion_r109810006
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
 ---
    @@ -117,46 +119,86 @@ class LogicalWindowAggregateRule
       }
     
       private def identifyWindow(field: RexNode): Option[Window] = {
    -    // Detects window expressions by pattern matching
    -    //   supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx),
    -    //   with time being equal to proctime() or rowtime()
         field match {
           case call: RexCall =>
             call.getOperator match {
    -          case _: SqlFloorFunction =>
    -            val operand = call.getOperands.get(1).asInstanceOf[RexLiteral]
    -            val unit: TimeUnitRange = 
operand.getValue.asInstanceOf[TimeUnitRange]
    -            val w = 
LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit)
    -            call.getType match {
    -              case TimeModeTypes.PROCTIME =>
    -                return Some(w)
    -              case TimeModeTypes.ROWTIME =>
    -                return Some(w.on("rowtime"))
    -              case _ =>
    -            }
    -          case _ =>
    +          case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.TUMBLE => 
TumbleWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.HOP => 
SlidingWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.SESSION => 
SessionWindowTranslator(call).toWindow
    +          case _ => None
             }
    -      case _ =>
    +      case _ => None
         }
    -    None
       }
    -
     }
     
    -object LogicalWindowAggregateRule {
    +private abstract class WindowTranslator {
    +  val call: RexCall
     
    -  private[flink] val LOGICAL_WINDOW_PREDICATE = 
RelOptRule.operand(classOf[LogicalAggregate],
    -    RelOptRule.operand(classOf[LogicalProject], RelOptRule.none()))
    +  protected def unwrapLiteral[T](node: RexNode): T =
    +    node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T]
     
    -  private[flink] val INSTANCE = new LogicalWindowAggregateRule
    +  protected def getOperandAsLong(idx: Int): Long =
    +    unwrapLiteral[BigDecimal](call.getOperands.get(idx)).longValue()
    --- End diff --
    
    I just tried out Calcite did not stop you from passing something like a 
{{RexCall}}. So yes, it can be dynamic.
    
    One question: whether Flink actually supports `GroupWindow` that has a 
dynamic size? Maybe I'm wrong but it does not seem so.
    
    If the answer is no maybe we should check that whether it is a `RexLiteral`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to