wuchong commented on a change in pull request #15195:
URL: https://github.com/apache/flink/pull/15195#discussion_r593905932



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala
##########
@@ -315,6 +315,38 @@ class FlinkRelMdWindowProperties private extends 
MetadataHandler[FlinkMetadata.W
     fmq.getRelWindowProperties(rel.getLeft)
   }
 
+  def getWindowProperties(
+      rel: StreamPhysicalWindowJoin,
+      mq: RelMetadataQuery): RelWindowProperties = {
+    val leftFieldCnt = rel.getLeft.getRowType.getFieldCount
+    val rightFieldCnt = rel.getRight.getRowType.getFieldCount
+
+    def inferWindowPropertyAfterWindowJoin(
+        leftWindowProperty: ImmutableBitSet,
+        rightWindowProperty: ImmutableBitSet): ImmutableBitSet = {
+      val fieldMapping = new JHashMap[Integer, Integer]()
+      (0 until rightFieldCnt).foreach(idx => fieldMapping.put(idx, 
leftFieldCnt + idx))
+      val rightWindowPropertyAfterWindowJoin = 
rightWindowProperty.permute(fieldMapping)
+      leftWindowProperty.union(rightWindowPropertyAfterWindowJoin)
+    }
+
+    val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
+    val leftWindowProperties = fmq.getRelWindowProperties(rel.getLeft)
+    val rightWindowProperties = fmq.getRelWindowProperties(rel.getRight)
+    assert(leftWindowProperties.getWindowSpec == 
rightWindowProperties.getWindowSpec)

Review comment:
       Do we need to use `assert` here? I think the query is still valid, we 
just can't infer the window properties. I think we can return null in this 
case. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -266,6 +266,12 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         }
         createNewNode(join, children, providedTrait, requiredTrait, requester)
 
+      case windowJoin: StreamPhysicalWindowJoin =>
+        // window join support all changes in input
+        val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES)

Review comment:
       I would suggest to use `INSERT_ONLY` here (and we don't add any tests 
for this). We can 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.planner.plan.`trait`.RelWindowProperties
+
+import org.apache.calcite.rex.{RexInputRef, RexNode, RexUtil}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.util.ImmutableIntList
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+/**
+ * Util for window join.
+ */
+object WindowJoinUtil {
+
+  /**
+   * Get window properties of left and right child.
+   *
+   * @param join input join
+   * @return window properties of left and right child.
+   */
+  def getChildWindowProperties(
+      join: FlinkLogicalJoin): (RelWindowProperties, RelWindowProperties) = {
+    val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(join.getCluster.getMetadataQuery)
+    (fmq.getRelWindowProperties(join.getLeft), 
fmq.getRelWindowProperties(join.getRight))
+  }
+
+  /**
+   * Checks whether join condition contains window starts equality of input 
tables or window
+   * ends equality of input tables.
+   *
+   * @param join input join
+   * @return True if join condition contains window starts equality of input 
tables and window
+   *         ends equality of input tables. Else false.
+   */
+  def containsWindowStartEqualityOrEndEquality(join: FlinkLogicalJoin): 
Boolean = {
+    val (windowStartEqualityLeftKeys, windowEndEqualityLeftKeys, _, _) =
+      excludeWindowStartEqualityAndEndEqualityFromJoinInfoPairs(join)
+    windowStartEqualityLeftKeys.nonEmpty || windowEndEqualityLeftKeys.nonEmpty
+  }
+
+  /**
+   * Excludes window starts equality and window ends equality from join info.
+   *
+   * @param join input join
+   * @return Remain join information after excludes window starts equality and 
window ends
+   *         equality from join.
+   *         The first element is left join keys of window starts equality,
+   *         the second element is left join keys of window ends equality,
+   *         the third element is right join keys of window starts equality,
+   *         the forth element is right join keys of window ends equality,
+   *         the fifth element is remain left join keys,
+   *         the sixth element is remain right join keys,
+   *         the last element is remain join condition which includes remain 
equal condition and
+   *         non-equal condition.
+   */
+  def excludeWindowStartEqualityAndEndEqualityFromJoinCondition(
+      join: FlinkLogicalJoin): (
+    ImmutableIntList,
+    ImmutableIntList,
+    ImmutableIntList,
+    ImmutableIntList,
+    ImmutableIntList,
+    ImmutableIntList,
+    RexNode) = {
+    val (
+      windowStartEqualityLeftKeys,
+      windowEndEqualityLeftKeys,
+      windowStartEqualityRightKeys,
+      windowEndEqualityRightKeys) =
+      excludeWindowStartEqualityAndEndEqualityFromJoinInfoPairs(join)
+
+    val joinInfo = join.analyzeCondition()
+    val (remainLeftKeys, remainRightKeys, remainCondition) = if (
+      windowStartEqualityLeftKeys.nonEmpty || 
windowEndEqualityLeftKeys.nonEmpty) {
+      val joinFieldsType = join.getRowType.getFieldList
+      val leftFieldCnt = join.getLeft.getRowType.getFieldCount
+      val rexBuilder = join.getCluster.getRexBuilder
+      val remainEquals = mutable.ArrayBuffer[RexNode]()
+      val remainLeftKeysArray = mutable.ArrayBuffer[Int]()
+      val remainRightKeysArray = mutable.ArrayBuffer[Int]()
+      // convert remain pairs to RexInputRef tuple for building 
SqlStdOperatorTable.EQUALS calls
+      joinInfo.pairs().foreach { p =>
+        if (!windowStartEqualityLeftKeys.contains(p.source) &&
+          !windowEndEqualityLeftKeys.contains(p.source)) {
+          val leftFieldType = joinFieldsType.get(p.source).getType
+          val leftInputRef = new RexInputRef(p.source, leftFieldType)
+          val rightIndex = leftFieldCnt + p.target
+          val rightFieldType = joinFieldsType.get(rightIndex).getType
+          val rightInputRef = new RexInputRef(rightIndex, rightFieldType)
+          val remainEqual = rexBuilder.makeCall(
+            SqlStdOperatorTable.EQUALS,

Review comment:
       Why the `remainCondition` should contain the join key equality?

##########
File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
##########
@@ -0,0 +1,1041 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testCantMergeWindowTVF_Cumulate">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.a, L.b, L.c, R.a, R.b, R.c
+FROM (
+  SELECT *
+  FROM TABLE(
+  CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL 
'1' HOUR))
+) L
+JOIN (
+  SELECT *
+  FROM TABLE(
+  CUMULATE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL 
'1' HOUR))
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$8], b0=[$9], c0=[$10])
++- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], 
joinType=[inner])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+   :  +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($3), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+   :     +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+   :        +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :           +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :              +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+      +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($3), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                  +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, a0, b0, c0])
++- WindowJoin(leftWindow=[CUMULATE(win_start=[window_start], 
win_end=[window_end], max_size=[1 h], step=[10 min])], 
rightWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], 
max_size=[1 h], step=[10 min])], joinType=[InnerJoin], where=[=(a, a0)], 
select=[a, b, c, window_start, window_end, a0, b0, c0, window_start0, 
window_end0])

Review comment:
       Note: when implementing window join operator, we can't support proctime 
window join for now, becuase the window assigner is in an separate operator, 
that means we don't know when to  trigger the processing-time window (we don't 
have something like watermark for proctime). We need to merge window assigner 
into window join to support proctime window join, but this can be a future 
work. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin
+import org.apache.flink.table.planner.plan.`trait`.RelWindowProperties
+
+import org.apache.calcite.rex.{RexInputRef, RexNode, RexUtil}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.util.ImmutableIntList
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+/**
+ * Util for window join.
+ */
+object WindowJoinUtil {
+
+  /**
+   * Get window properties of left and right child.
+   *
+   * @param join input join
+   * @return window properties of left and right child.
+   */
+  def getChildWindowProperties(
+      join: FlinkLogicalJoin): (RelWindowProperties, RelWindowProperties) = {
+    val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(join.getCluster.getMetadataQuery)
+    (fmq.getRelWindowProperties(join.getLeft), 
fmq.getRelWindowProperties(join.getRight))
+  }
+
+  /**
+   * Checks whether join condition contains window starts equality of input 
tables or window
+   * ends equality of input tables.
+   *
+   * @param join input join
+   * @return True if join condition contains window starts equality of input 
tables and window
+   *         ends equality of input tables. Else false.
+   */
+  def containsWindowStartEqualityOrEndEquality(join: FlinkLogicalJoin): 
Boolean = {
+    val (windowStartEqualityLeftKeys, windowEndEqualityLeftKeys, _, _) =
+      excludeWindowStartEqualityAndEndEqualityFromJoinInfoPairs(join)
+    windowStartEqualityLeftKeys.nonEmpty || windowEndEqualityLeftKeys.nonEmpty

Review comment:
       I think it would be better to use `&&`, because we don't allow start OR 
end for now in `excludeWindowStartEqualityAndEndEqualityFromJoinInfoPairs`. 

##########
File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
##########
@@ -0,0 +1,1041 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testCantMergeWindowTVF_Cumulate">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.a, L.b, L.c, R.a, R.b, R.c
+FROM (
+  SELECT *
+  FROM TABLE(
+  CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL 
'1' HOUR))
+) L
+JOIN (
+  SELECT *
+  FROM TABLE(
+  CUMULATE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL 
'1' HOUR))
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$8], b0=[$9], c0=[$10])
++- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], 
joinType=[inner])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+   :  +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($3), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+   :     +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+   :        +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :           +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :              +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+      +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($3), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                  +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, a0, b0, c0])
++- WindowJoin(leftWindow=[CUMULATE(win_start=[window_start], 
win_end=[window_end], max_size=[1 h], step=[10 min])], 
rightWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], 
max_size=[1 h], step=[10 min])], joinType=[InnerJoin], where=[=(a, a0)], 
select=[a, b, c, window_start, window_end, a0, b0, c0, window_start0, 
window_end0])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a, b, c, window_start, window_end])
+   :     +- WindowTableFunction(window=[CUMULATE(time_col=[rowtime], 
max_size=[1 h], step=[10 min])])
+   :        +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+   :           +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+   :              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, b, c, window_start, window_end])
+         +- WindowTableFunction(window=[CUMULATE(time_col=[rowtime], 
max_size=[1 h], step=[10 min])])
+            +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+               +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCantMergeWindowTVF_CumulateOnProctime">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.a, L.b, L.c, R.a, R.b, R.c
+FROM (
+  SELECT *
+  FROM TABLE(
+  CUMULATE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL 
'1' HOUR))
+) L
+JOIN (
+  SELECT *
+  FROM TABLE(
+  CUMULATE(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, 
INTERVAL '1' HOUR))
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$8], b0=[$9], c0=[$10])
++- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], 
joinType=[inner])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+   :  +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($4), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(PROCTIME) window_time)])
+   :     +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+   :        +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :           +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :              +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+      +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($4), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(PROCTIME) window_time)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                  +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, a0, b0, c0])
++- WindowJoin(leftWindow=[CUMULATE(win_start=[window_start], 
win_end=[window_end], max_size=[1 h], step=[10 min])], 
rightWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], 
max_size=[1 h], step=[10 min])], joinType=[InnerJoin], where=[=(a, a0)], 
select=[a, b, c, window_start, window_end, a0, b0, c0, window_start0, 
window_end0])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a, b, c, window_start, window_end])
+   :     +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], 
max_size=[1 h], step=[10 min])])
+   :        +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+   :           +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+   :              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, b, c, window_start, window_end])
+         +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], 
max_size=[1 h], step=[10 min])])
+            +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+               +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCantMergeWindowTVF_Hop">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.a, L.b, L.c, R.a, R.b, R.c
+FROM (
+  SELECT *
+  FROM TABLE(
+  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' 
MINUTE))
+) L
+JOIN (
+  SELECT *
+  FROM TABLE(
+  HOP(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' 
MINUTE))
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$8], b0=[$9], c0=[$10])
++- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], 
joinType=[inner])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+   :  +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($3), 
300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER 
a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+   :     +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+   :        +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :           +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :              +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+      +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($3), 
300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER 
a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                  +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, a0, b0, c0])
++- WindowJoin(leftWindow=[HOP(win_start=[window_start], win_end=[window_end], 
size=[10 min], slide=[5 min])], rightWindow=[HOP(win_start=[window_start], 
win_end=[window_end], size=[10 min], slide=[5 min])], joinType=[InnerJoin], 
where=[=(a, a0)], select=[a, b, c, window_start, window_end, a0, b0, c0, 
window_start0, window_end0])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a, b, c, window_start, window_end])
+   :     +- WindowTableFunction(window=[HOP(time_col=[rowtime], size=[10 min], 
slide=[5 min])])
+   :        +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+   :           +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+   :              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, b, c, window_start, window_end])
+         +- WindowTableFunction(window=[HOP(time_col=[rowtime], size=[10 min], 
slide=[5 min])])
+            +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+               +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCantMergeWindowTVF_HopOnProctime">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.a, L.b, L.c, R.a, R.b, R.c
+FROM (
+  SELECT *
+  FROM TABLE(
+  HOP(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' 
MINUTE))
+) L
+JOIN (
+  SELECT *
+  FROM TABLE(
+  HOP(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' 
MINUTE))
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$8], b0=[$9], c0=[$10])
++- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], 
joinType=[inner])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+   :  +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($4), 
300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER 
a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(PROCTIME) window_time)])
+   :     +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+   :        +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :           +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :              +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+      +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($4), 
300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER 
a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(PROCTIME) window_time)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                  +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, a0, b0, c0])
++- WindowJoin(leftWindow=[HOP(win_start=[window_start], win_end=[window_end], 
size=[10 min], slide=[5 min])], rightWindow=[HOP(win_start=[window_start], 
win_end=[window_end], size=[10 min], slide=[5 min])], joinType=[InnerJoin], 
where=[=(a, a0)], select=[a, b, c, window_start, window_end, a0, b0, c0, 
window_start0, window_end0])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a, b, c, window_start, window_end])
+   :     +- WindowTableFunction(window=[HOP(time_col=[proctime], size=[10 
min], slide=[5 min])])
+   :        +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+   :           +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+   :              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, b, c, window_start, window_end])
+         +- WindowTableFunction(window=[HOP(time_col=[proctime], size=[10 
min], slide=[5 min])])
+            +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+               +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCantMergeWindowTVF_Tumble">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.a, L.b, L.c, R.a, R.b, R.c
+FROM (
+  SELECT *
+  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+) L
+JOIN (
+  SELECT *
+  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$8], b0=[$9], c0=[$10])
++- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], 
joinType=[inner])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+   :  +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($3), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) 
window_time)])
+   :     +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+   :        +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :           +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :              +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+      +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($3), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) 
window_time)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                  +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, a0, b0, c0])
++- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[15 min])], 
rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 
min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, window_start, 
window_end, a0, b0, c0, window_start0, window_end0])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a, b, c, window_start, window_end])
+   :     +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 
min])])
+   :        +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+   :           +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+   :              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, b, c, window_start, window_end])
+         +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 
min])])
+            +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+               +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCantMergeWindowTVF_TumbleOnProctime">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.a, L.b, L.c, R.a, R.b, R.c
+FROM (
+  SELECT *
+  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))
+) L
+JOIN (
+  SELECT *
+  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '15' 
MINUTE))
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$8], b0=[$9], c0=[$10])
++- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], 
joinType=[inner])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+   :  +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($4), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(PROCTIME) 
window_time)])
+   :     +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+   :        +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :           +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :              +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+      +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($4), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(PROCTIME) 
window_time)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                  +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, a0, b0, c0])
++- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[15 min])], 
rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 
min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, window_start, 
window_end, a0, b0, c0, window_start0, window_end0])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a, b, c, window_start, window_end])
+   :     +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[15 
min])])
+   :        +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+   :           +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+   :              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, b, c, window_start, window_end])
+         +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[15 
min])])
+            +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+               +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testOnCumulateWindowAggregate">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.*, R.*
+FROM (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(
+    CUMULATE(
+      TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' 
HOUR))
+  GROUP BY a, window_start, window_end, window_time
+) L
+JOIN (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(
+    CUMULATE(
+      TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' 
HOUR))
+  GROUP BY a, window_start, window_end, window_time
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], 
cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], 
window_time0=[$9], cnt0=[$10], uv0=[$11])
++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], 
joinType=[inner])
+   :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+   :  +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+   :     +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($3), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+   :        +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :           +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :              +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+      +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+         +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($3), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+WindowJoin(leftWindow=[CUMULATE(win_start=[window_start], 
win_end=[window_end], max_size=[1 h], step=[10 min])], 
rightWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], 
max_size=[1 h], step=[10 min])], joinType=[InnerJoin], where=[=(a, a0)], 
select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, 
window_end0, window_time0, cnt0, uv0])
+:- Exchange(distribution=[hash[a]])
+:  +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+:     +- WindowAggregate(groupBy=[a], window=[CUMULATE(time_col=[rowtime], 
max_size=[1 h], step=[10 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) 
AS uv, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS 
window_time])
+:        +- Exchange(distribution=[hash[a]])
+:           +- Calc(select=[a, c, rowtime])
+:              +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+:                 +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+:                    +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
++- Exchange(distribution=[hash[a]])
+   +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+      +- WindowAggregate(groupBy=[a], window=[CUMULATE(time_col=[rowtime], 
max_size=[1 h], step=[10 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) 
AS uv, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS 
window_time])
+         +- Exchange(distribution=[hash[a]])
+            +- Calc(select=[a, c, rowtime])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+                  +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTimeAttributePropagateForWindowJoin1">
+    <Resource name="sql">
+      <![CDATA[
+SELECT tmp1.*, MyTable4.* FROM tmp1 JOIN MyTable4 ON
+ tmp1.a = MyTable4.a AND
+ tmp1.rowtime BETWEEN
+   MyTable4.rowtime - INTERVAL '10' SECOND AND
+   MyTable4.rowtime + INTERVAL '1' HOUR
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(rowtime=[$0], a=[$1], l_cnt=[$2], l_uv=[$3], r_cnt=[$4], 
r_uv=[$5], a0=[$6], b=[$7], c=[$8], rowtime0=[$9], proctime=[$10])
++- LogicalJoin(condition=[AND(=($1, $6), >=($0, -($9, 10000:INTERVAL SECOND)), 
<=($0, +($9, 3600000:INTERVAL HOUR)))], joinType=[inner])
+   :- LogicalProject(rowtime=[$3], a=[$0], l_cnt=[$4], l_uv=[$5], r_cnt=[$10], 
r_uv=[$11])
+   :  +- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], 
joinType=[inner])
+   :     :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], 
uv=[COUNT(DISTINCT $4)])
+   :     :  +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+   :     :     +- LogicalTableFunctionScan(invocation=[CUMULATE($4, 
DESCRIPTOR($3), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], 
rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME 
ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+   :     :        +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :     :           +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($3, 1000:INTERVAL SECOND)])
+   :     :              +- LogicalProject(a=[$0], b=[$1], c=[$2], 
rowtime=[$3], proctime=[PROCTIME()])
+   :     :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   :     +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], 
uv=[COUNT(DISTINCT $4)])
+   :        +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+   :           +- LogicalTableFunctionScan(invocation=[CUMULATE($4, 
DESCRIPTOR($3), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], 
rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME 
ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+   :              +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :                 +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($3, 1000:INTERVAL SECOND)])
+   :                    +- LogicalProject(a=[$0], b=[$1], c=[$2], 
rowtime=[$3], proctime=[PROCTIME()])
+   :                       +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable4]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[rowtime, a, l_cnt, l_uv, r_cnt, r_uv, a0, b, c, rowtime0, 
PROCTIME_MATERIALIZE(proctime) AS proctime])
++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-10000, leftUpperBound=3600000, leftTimeIndex=0, 
rightTimeIndex=3], where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 10000:INTERVAL 
SECOND)), <=(rowtime, +(rowtime0, 3600000:INTERVAL HOUR)))], select=[rowtime, 
a, l_cnt, l_uv, r_cnt, r_uv, a0, b, c, rowtime0, proctime])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[window_time AS rowtime, a, cnt AS l_cnt, uv AS l_uv, 
cnt0 AS r_cnt, uv0 AS r_uv])
+   :     +- WindowJoin(leftWindow=[CUMULATE(win_start=[window_start], 
win_end=[window_end], max_size=[1 h], step=[10 min])], 
rightWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], 
max_size=[1 h], step=[10 min])], joinType=[InnerJoin], where=[=(a, a0)], 
select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, 
window_end0, cnt0, uv0])
+   :        :- Exchange(distribution=[hash[a]])
+   :        :  +- Calc(select=[a, window_start, window_end, window_time, cnt, 
uv])
+   :        :     +- WindowAggregate(groupBy=[a], 
window=[CUMULATE(time_col=[rowtime], max_size=[1 h], step=[10 min])], 
select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) AS 
window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
+   :        :        +- Exchange(distribution=[hash[a]])
+   :        :           +- Calc(select=[a, c, rowtime])
+   :        :              +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+   :        :                 +- Calc(select=[a, b, c, rowtime, PROCTIME() AS 
proctime])
+   :        :                    +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   :        +- Exchange(distribution=[hash[a]])
+   :           +- Calc(select=[a, window_start, window_end, cnt, uv])
+   :              +- WindowAggregate(groupBy=[a], 
window=[CUMULATE(time_col=[rowtime], max_size=[1 h], step=[10 min])], 
select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) AS 
window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
+   :                 +- Exchange(distribution=[hash[a]])
+   :                    +- Calc(select=[a, c, rowtime])
+   :                       +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+   :                          +- Calc(select=[a, b, c, rowtime, PROCTIME() AS 
proctime])
+   :                             +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+         +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable4]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testOnCumulateWindowAggregateOnProctime">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.*, R.*
+FROM (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(
+    CUMULATE(
+      TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' 
HOUR))
+  GROUP BY a, window_start, window_end, window_time
+) L
+JOIN (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(
+    CUMULATE(
+      TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' 
HOUR))
+  GROUP BY a, window_start, window_end, window_time
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], 
cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], 
window_time0=[$9], cnt0=[$10], uv0=[$11])
++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], 
joinType=[inner])
+   :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+   :  +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+   :     +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($4), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(PROCTIME) window_time)])
+   :        +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :           +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :              +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+      +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+         +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($4), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(PROCTIME) window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS 
window_time, cnt, uv, a0, window_start0, window_end0, 
PROCTIME_MATERIALIZE(window_time0) AS window_time0, cnt0, uv0])
++- WindowJoin(leftWindow=[CUMULATE(win_start=[window_start], 
win_end=[window_end], max_size=[1 h], step=[10 min])], 
rightWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], 
max_size=[1 h], step=[10 min])], joinType=[InnerJoin], where=[=(a, a0)], 
select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, 
window_end0, window_time0, cnt0, uv0])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+   :     +- WindowAggregate(groupBy=[a], window=[CUMULATE(time_col=[proctime], 
max_size=[1 h], step=[10 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) 
AS uv, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS 
window_time])
+   :        +- Exchange(distribution=[hash[a]])
+   :           +- Calc(select=[a, c, proctime])
+   :              +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+   :                 +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+   :                    +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+         +- WindowAggregate(groupBy=[a], window=[CUMULATE(time_col=[proctime], 
max_size=[1 h], step=[10 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) 
AS uv, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS 
window_time])
+            +- Exchange(distribution=[hash[a]])
+               +- Calc(select=[a, c, proctime])
+                  +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                     +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testOnHopWindowAggregate">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.*, R.*
+FROM (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(
+  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' 
MINUTE))
+  GROUP BY a, window_start, window_end, window_time
+) L
+JOIN (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(
+  HOP(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' 
MINUTE))
+  GROUP BY a, window_start, window_end, window_time
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], 
cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], 
window_time0=[$9], cnt0=[$10], uv0=[$11])
++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], 
joinType=[inner])
+   :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+   :  +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+   :     +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($3), 
300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER 
a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+   :        +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :           +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :              +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+      +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+         +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($3), 
300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER 
a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+WindowJoin(leftWindow=[HOP(win_start=[window_start], win_end=[window_end], 
size=[10 min], slide=[5 min])], rightWindow=[HOP(win_start=[window_start], 
win_end=[window_end], size=[10 min], slide=[5 min])], joinType=[InnerJoin], 
where=[=(a, a0)], select=[a, window_start, window_end, window_time, cnt, uv, 
a0, window_start0, window_end0, window_time0, cnt0, uv0])
+:- Exchange(distribution=[hash[a]])
+:  +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+:     +- WindowAggregate(groupBy=[a], window=[HOP(time_col=[rowtime], size=[10 
min], slide=[5 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, 
start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS 
window_time])
+:        +- Exchange(distribution=[hash[a]])
+:           +- Calc(select=[a, c, rowtime])
+:              +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+:                 +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+:                    +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
++- Exchange(distribution=[hash[a]])
+   +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+      +- WindowAggregate(groupBy=[a], window=[HOP(time_col=[rowtime], size=[10 
min], slide=[5 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, 
start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS 
window_time])
+         +- Exchange(distribution=[hash[a]])
+            +- Calc(select=[a, c, rowtime])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+                  +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testOnHopWindowAggregateOnProctime">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.*, R.*
+FROM (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(
+  HOP(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' 
MINUTE))
+  GROUP BY a, window_start, window_end, window_time
+) L
+JOIN (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(
+  HOP(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' 
MINUTE))
+  GROUP BY a, window_start, window_end, window_time
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], 
cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], 
window_time0=[$9], cnt0=[$10], uv0=[$11])
++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], 
joinType=[inner])
+   :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+   :  +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+   :     +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($4), 
300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER 
a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(PROCTIME) window_time)])
+   :        +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :           +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :              +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+      +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+         +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($4), 
300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER 
a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(PROCTIME) window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS 
window_time, cnt, uv, a0, window_start0, window_end0, 
PROCTIME_MATERIALIZE(window_time0) AS window_time0, cnt0, uv0])
++- WindowJoin(leftWindow=[HOP(win_start=[window_start], win_end=[window_end], 
size=[10 min], slide=[5 min])], rightWindow=[HOP(win_start=[window_start], 
win_end=[window_end], size=[10 min], slide=[5 min])], joinType=[InnerJoin], 
where=[=(a, a0)], select=[a, window_start, window_end, window_time, cnt, uv, 
a0, window_start0, window_end0, window_time0, cnt0, uv0])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+   :     +- WindowAggregate(groupBy=[a], window=[HOP(time_col=[proctime], 
size=[10 min], slide=[5 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) 
AS uv, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS 
window_time])
+   :        +- Exchange(distribution=[hash[a]])
+   :           +- Calc(select=[a, c, proctime])
+   :              +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+   :                 +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+   :                    +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+         +- WindowAggregate(groupBy=[a], window=[HOP(time_col=[proctime], 
size=[10 min], slide=[5 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) 
AS uv, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS 
window_time])
+            +- Exchange(distribution=[hash[a]])
+               +- Calc(select=[a, c, proctime])
+                  +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                     +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testOnTumbleWindowAggregate">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.*, R.*
+FROM (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+  GROUP BY a, window_start, window_end, window_time
+) L
+JOIN (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+  GROUP BY a, window_start, window_end, window_time
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], 
cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], 
window_time0=[$9], cnt0=[$10], uv0=[$11])
++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], 
joinType=[inner])
+   :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+   :  +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+   :     +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($3), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) 
window_time)])
+   :        +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :           +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :              +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+      +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+         +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($3), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) 
window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], 
size=[15 min])], rightWindow=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[15 min])], joinType=[InnerJoin], where=[=(a, a0)], 
select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, 
window_end0, window_time0, cnt0, uv0])
+:- Exchange(distribution=[hash[a]])
+:  +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+:     +- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], 
size=[15 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, 
start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS 
window_time])
+:        +- Exchange(distribution=[hash[a]])
+:           +- Calc(select=[a, c, rowtime])
+:              +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+:                 +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+:                    +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
++- Exchange(distribution=[hash[a]])
+   +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+      +- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], 
size=[15 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, 
start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS 
window_time])
+         +- Exchange(distribution=[hash[a]])
+            +- Calc(select=[a, c, rowtime])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+                  +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testOnTumbleWindowAggregateOnProctime">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.*, R.*
+FROM (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))
+  GROUP BY a, window_start, window_end, window_time
+) L
+JOIN (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '15' 
MINUTE))
+  GROUP BY a, window_start, window_end, window_time
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], 
cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], 
window_time0=[$9], cnt0=[$10], uv0=[$11])
++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], 
joinType=[inner])
+   :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+   :  +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+   :     +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($4), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(PROCTIME) 
window_time)])
+   :        +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :           +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :              +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+      +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+         +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($4), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(PROCTIME) 
window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS 
window_time, cnt, uv, a0, window_start0, window_end0, 
PROCTIME_MATERIALIZE(window_time0) AS window_time0, cnt0, uv0])
++- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[15 min])], 
rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 
min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, window_start, 
window_end, window_time, cnt, uv, a0, window_start0, window_end0, window_time0, 
cnt0, uv0])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+   :     +- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[proctime], 
size=[15 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, 
start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS 
window_time])
+   :        +- Exchange(distribution=[hash[a]])
+   :           +- Calc(select=[a, c, proctime])
+   :              +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+   :                 +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+   :                    +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+         +- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[proctime], 
size=[15 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, 
start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS 
window_time])
+            +- Exchange(distribution=[hash[a]])
+               +- Calc(select=[a, c, proctime])
+                  +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                     +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTimeAttributePropagateForWindowJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT tmp.*, MyTable3.* FROM tmp JOIN MyTable3 ON
+ tmp.a = MyTable3.a AND
+ tmp.rowtime BETWEEN
+   MyTable3.rowtime - INTERVAL '10' SECOND AND
+   MyTable3.rowtime + INTERVAL '1' HOUR
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(rowtime=[$0], a=[$1], l_b=[$2], l_c=[$3], r_b=[$4], r_c=[$5], 
a0=[$6], b=[$7], c=[$8], rowtime0=[$9], proctime=[$10])
++- LogicalJoin(condition=[AND(=($1, $6), >=($0, -($9, 10000:INTERVAL SECOND)), 
<=($0, +($9, 3600000:INTERVAL HOUR)))], joinType=[inner])
+   :- LogicalProject(rowtime=[$7], a=[$0], l_b=[$1], l_c=[$2], r_b=[$9], 
r_c=[$10])
+   :  +- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], 
joinType=[inner])
+   :     :- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4], window_start=[$5], window_end=[$6], window_time=[$7])
+   :     :  +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($3), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) 
window_time)])
+   :     :     +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :     :        +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($3, 1000:INTERVAL SECOND)])
+   :     :           +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :     :              +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   :     +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4], window_start=[$5], window_end=[$6], window_time=[$7])
+   :        +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($3), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) 
window_time)])
+   :           +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :              +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($3, 1000:INTERVAL SECOND)])
+   :                 +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :                    +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable3]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[rowtime, a, l_b, l_c, r_b, r_c, a0, b, c, rowtime0, 
PROCTIME_MATERIALIZE(proctime) AS proctime])
++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-10000, leftUpperBound=3600000, leftTimeIndex=0, 
rightTimeIndex=3], where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 10000:INTERVAL 
SECOND)), <=(rowtime, +(rowtime0, 3600000:INTERVAL HOUR)))], select=[rowtime, 
a, l_b, l_c, r_b, r_c, a0, b, c, rowtime0, proctime])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[window_time AS rowtime, a, b AS l_b, c AS l_c, b0 AS 
r_b, c0 AS r_c])
+   :     +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[15 min])], 
rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 
min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, window_start, 
window_end, window_time, a0, b0, c0, window_start0, window_end0])
+   :        :- Exchange(distribution=[hash[a]])
+   :        :  +- Calc(select=[a, b, c, window_start, window_end, window_time])
+   :        :     +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], 
size=[15 min])])
+   :        :        +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+   :        :           +- Calc(select=[a, b, c, rowtime, PROCTIME() AS 
proctime])
+   :        :              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   :        +- Exchange(distribution=[hash[a]])
+   :           +- Calc(select=[a, b, c, window_start, window_end])
+   :              +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], 
size=[15 min])])
+   :                 +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+   :                    +- Calc(select=[a, b, c, rowtime, PROCTIME() AS 
proctime])
+   :                       +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+         +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable3]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testWindowJoinWithNonEqui">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.*, R.*
+FROM (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(
+    CUMULATE(
+      TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' 
HOUR))
+  GROUP BY a, window_start, window_end, window_time
+) L
+JOIN (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(
+    CUMULATE(
+      TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' 
HOUR))
+  GROUP BY a, window_start, window_end, window_time
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a AND
+ CAST(L.window_start AS BIGINT) > R.uv
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], 
cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], 
window_time0=[$9], cnt0=[$10], uv0=[$11])
++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6), 
>(CAST($1):BIGINT NOT NULL, $11))], joinType=[inner])
+   :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+   :  +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+   :     +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($4), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(PROCTIME) window_time)])
+   :        +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :           +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :              +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+      +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+         +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($4), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(PROCTIME) window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS 
window_time, cnt, uv, a0, window_start0, window_end0, 
PROCTIME_MATERIALIZE(window_time0) AS window_time0, cnt0, uv0])
++- WindowJoin(leftWindow=[CUMULATE(win_start=[window_start], 
win_end=[window_end], max_size=[1 h], step=[10 min])], 
rightWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], 
max_size=[1 h], step=[10 min])], joinType=[InnerJoin], where=[AND(=(a, a0), 
>(CAST(window_start), uv0))], select=[a, window_start, window_end, window_time, 
cnt, uv, a0, window_start0, window_end0, window_time0, cnt0, uv0])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+   :     +- WindowAggregate(groupBy=[a], window=[CUMULATE(time_col=[proctime], 
max_size=[1 h], step=[10 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) 
AS uv, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS 
window_time])
+   :        +- Exchange(distribution=[hash[a]])
+   :           +- Calc(select=[a, c, proctime])
+   :              +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+   :                 +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+   :                    +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+         +- WindowAggregate(groupBy=[a], window=[CUMULATE(time_col=[proctime], 
max_size=[1 h], step=[10 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) 
AS uv, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS 
window_time])
+            +- Exchange(distribution=[hash[a]])
+               +- Calc(select=[a, c, proctime])
+                  +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                     +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testWindowPropertyPropagateForWindowJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM
+(
+  SELECT *,
+    ROW_NUMBER() OVER(
+      PARTITION BY window_start, window_end ORDER BY l_cnt DESC) as rownum
+  FROM tmp2
+)
+WHERE rownum <= 3
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(window_start=[$0], window_end=[$1], a=[$2], l_cnt=[$3], 
l_uv=[$4], r_cnt=[$5], r_uv=[$6], rownum=[$7])
++- LogicalFilter(condition=[<=($7, 3)])
+   +- LogicalProject(window_start=[$1], window_end=[$2], a=[$0], l_cnt=[$4], 
l_uv=[$5], r_cnt=[$10], r_uv=[$11], rownum=[ROW_NUMBER() OVER (PARTITION BY $1, 
$2 ORDER BY $4 DESC NULLS LAST)])
+      +- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], 
joinType=[inner])
+         :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], 
uv=[COUNT(DISTINCT $4)])
+         :  +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+         :     +- LogicalTableFunctionScan(invocation=[CUMULATE($4, 
DESCRIPTOR($3), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], 
rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME 
ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+         :        +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+         :           +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($3, 1000:INTERVAL SECOND)])
+         :              +- LogicalProject(a=[$0], b=[$1], c=[$2], 
rowtime=[$3], proctime=[PROCTIME()])
+         :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+         +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], 
uv=[COUNT(DISTINCT $4)])
+            +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+               +- LogicalTableFunctionScan(invocation=[CUMULATE($4, 
DESCRIPTOR($3), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], 
rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME 
ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+                     +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($3, 1000:INTERVAL SECOND)])
+                        +- LogicalProject(a=[$0], b=[$1], c=[$2], 
rowtime=[$3], proctime=[PROCTIME()])
+                           +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[window_start, window_end, a, cnt AS l_cnt, uv AS l_uv, cnt0 AS 
r_cnt, uv0 AS r_uv, rownum])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=3], partitionBy=[window_start, window_end], 
orderBy=[cnt DESC], select=[a, window_start, window_end, window_time, cnt, uv, 
a0, window_start0, window_end0, window_time0, cnt0, uv0, rownum])

Review comment:
       Does it mean window properties are not propagated? Because here is 
`Rank` instead of  `WindowRank`.

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
##########
@@ -0,0 +1,922 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.stream.sql.join
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.planner.utils.{StreamTableTestUtil, 
TableTestBase}
+
+import org.junit.Test
+
+/**
+ * Tests for window join.
+ */
+class WindowJoinTest extends TableTestBase {
+
+  private val util: StreamTableTestUtil = streamTestUtil()
+  util.tableEnv.executeSql(
+    s"""
+       |CREATE TABLE MyTable (
+       |  a INT,
+       |  b STRING NOT NULL,
+       |  c BIGINT,
+       |  rowtime TIMESTAMP(3),
+       |  proctime as PROCTIME(),
+       |  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND
+       |) with (
+       |  'connector' = 'values'
+       |)
+       |""".stripMargin)
+
+  util.tableEnv.executeSql(
+    s"""
+       |CREATE TABLE MyTable2 (
+       |  a INT,
+       |  b STRING NOT NULL,
+       |  c BIGINT,
+       |  rowtime TIMESTAMP(3),
+       |  proctime as PROCTIME(),
+       |  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND
+       |) with (
+       |  'connector' = 'values'
+       |)
+       |""".stripMargin)
+
+  // 
----------------------------------------------------------------------------------------
+  // Tests for queries Join on window TVF
+  // Current does not support merge Window TVF into WindowJoin.
+  // 
----------------------------------------------------------------------------------------
+
+  @Test
+  def testCantMergeWindowTVF_Tumble(): Unit = {
+    val sql =
+      """
+        |SELECT L.a, L.b, L.c, R.a, R.b, R.c
+        |FROM (
+        |  SELECT *
+        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' 
MINUTE))
+        |) L
+        |JOIN (
+        |  SELECT *
+        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL 
'15' MINUTE))
+        |) R
+        |ON L.window_start = R.window_start AND L.window_end = R.window_end 
AND L.a = R.a
+      """.stripMargin
+    util.verifyRelPlan(sql)
+  }
+
+  @Test
+  def testCantMergeWindowTVF_TumbleOnProctime(): Unit = {
+    val sql =
+      """
+        |SELECT L.a, L.b, L.c, R.a, R.b, R.c
+        |FROM (
+        |  SELECT *
+        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL 
'15' MINUTE))
+        |) L
+        |JOIN (
+        |  SELECT *
+        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL 
'15' MINUTE))
+        |) R
+        |ON L.window_start = R.window_start AND L.window_end = R.window_end 
AND L.a = R.a
+      """.stripMargin
+    util.verifyRelPlan(sql)
+  }
+
+  @Test
+  def testCantMergeWindowTVF_Hop(): Unit = {
+    val sql =
+      """
+        |SELECT L.a, L.b, L.c, R.a, R.b, R.c
+        |FROM (
+        |  SELECT *
+        |  FROM TABLE(
+        |  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, 
INTERVAL '10' MINUTE))
+        |) L
+        |JOIN (
+        |  SELECT *
+        |  FROM TABLE(
+        |  HOP(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, 
INTERVAL '10' MINUTE))
+        |) R
+        |ON L.window_start = R.window_start AND L.window_end = R.window_end 
AND L.a = R.a
+      """.stripMargin
+    util.verifyRelPlan(sql)
+  }
+
+  @Test
+  def testCantMergeWindowTVF_HopOnProctime(): Unit = {
+    val sql =
+      """
+        |SELECT L.a, L.b, L.c, R.a, R.b, R.c
+        |FROM (
+        |  SELECT *
+        |  FROM TABLE(
+        |  HOP(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, 
INTERVAL '10' MINUTE))
+        |) L
+        |JOIN (
+        |  SELECT *
+        |  FROM TABLE(
+        |  HOP(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, 
INTERVAL '10' MINUTE))
+        |) R
+        |ON L.window_start = R.window_start AND L.window_end = R.window_end 
AND L.a = R.a
+      """.stripMargin
+    util.verifyRelPlan(sql)
+  }
+
+  @Test
+  def testCantMergeWindowTVF_Cumulate(): Unit = {
+    val sql =
+      """
+        |SELECT L.a, L.b, L.c, R.a, R.b, R.c
+        |FROM (
+        |  SELECT *
+        |  FROM TABLE(
+        |  CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, 
INTERVAL '1' HOUR))
+        |) L
+        |JOIN (
+        |  SELECT *
+        |  FROM TABLE(
+        |  CUMULATE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, 
INTERVAL '1' HOUR))
+        |) R
+        |ON L.window_start = R.window_start AND L.window_end = R.window_end 
AND L.a = R.a
+      """.stripMargin
+    util.verifyRelPlan(sql)
+  }
+
+  @Test
+  def testCantMergeWindowTVF_CumulateOnProctime(): Unit = {
+    val sql =
+      """
+        |SELECT L.a, L.b, L.c, R.a, R.b, R.c
+        |FROM (
+        |  SELECT *
+        |  FROM TABLE(
+        |  CUMULATE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, 
INTERVAL '1' HOUR))
+        |) L
+        |JOIN (
+        |  SELECT *
+        |  FROM TABLE(
+        |  CUMULATE(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '10' 
MINUTE, INTERVAL '1' HOUR))
+        |) R
+        |ON L.window_start = R.window_start AND L.window_end = R.window_end 
AND L.a = R.a
+      """.stripMargin
+    util.verifyRelPlan(sql)
+  }
+
+  // 
----------------------------------------------------------------------------------------
+  // Tests for invalid queries Join on window Aggregate
+  // because left window strategy is not equals to right window strategy.
+  // 
----------------------------------------------------------------------------------------
+
+  /** Window type in left and right child should be same **/
+  @Test(expected = classOf[TableException])

Review comment:
       Use `thrown.expect(...)` and `thrown.expectMessage(..)` to assert 
exception and error message.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowJoin.scala
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.planner.plan.logical.WindowingStrategy
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode
+import 
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalJoin
+import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall
+import 
org.apache.flink.table.planner.plan.utils.RelExplainUtil.preferExpressionFormat
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.core.{Join, JoinRelType}
+import org.apache.calcite.rex.RexNode
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+ * The window join requires the join condition contains window starts equality 
of
+ * input tables and window ends equality of input tables.
+ * The semantic of window join is the same to the DataStream window join.
+ */
+class StreamPhysicalWindowJoin(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    leftRel: RelNode,
+    rightRel: RelNode,
+    joinType: JoinRelType,
+    // remaining join condition contains all of join condition except window 
starts equality
+    // and window end equality
+    remainingCondition: RexNode,
+    val leftWindowing: WindowingStrategy,
+    val rightWindowing: WindowingStrategy)

Review comment:
       It seems that we do not allow different window strategy for left and 
right input. I think we can simplify the design to only have a single window 
strategy. We can evolve it in the future. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to