beyond1920 commented on a change in pull request #15195:
URL: https://github.com/apache/flink/pull/15195#discussion_r594017064



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala
##########
@@ -315,6 +315,38 @@ class FlinkRelMdWindowProperties private extends 
MetadataHandler[FlinkMetadata.W
     fmq.getRelWindowProperties(rel.getLeft)
   }
 
+  def getWindowProperties(
+      rel: StreamPhysicalWindowJoin,
+      mq: RelMetadataQuery): RelWindowProperties = {
+    val leftFieldCnt = rel.getLeft.getRowType.getFieldCount
+    val rightFieldCnt = rel.getRight.getRowType.getFieldCount
+
+    def inferWindowPropertyAfterWindowJoin(
+        leftWindowProperty: ImmutableBitSet,
+        rightWindowProperty: ImmutableBitSet): ImmutableBitSet = {
+      val fieldMapping = new JHashMap[Integer, Integer]()
+      (0 until rightFieldCnt).foreach(idx => fieldMapping.put(idx, 
leftFieldCnt + idx))
+      val rightWindowPropertyAfterWindowJoin = 
rightWindowProperty.permute(fieldMapping)
+      leftWindowProperty.union(rightWindowPropertyAfterWindowJoin)
+    }
+
+    val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
+    val leftWindowProperties = fmq.getRelWindowProperties(rel.getLeft)
+    val rightWindowProperties = fmq.getRelWindowProperties(rel.getRight)
+    assert(leftWindowProperties.getWindowSpec == 
rightWindowProperties.getWindowSpec)

Review comment:
       The case will not happen because the node will not be translated to 
WindowJoin if `WindowSpec` is different.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to