Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r137031980 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala --- @@ -115,10 +118,15 @@ object WindowJoinUtil { case _ => Some(otherPreds.reduceLeft((l, r) => RelOptUtil.andJoinFilters(rexBuilder, l, r))) } - - val bounds = Some(WindowBounds(timePreds.head.isEventTime, leftLowerBound, leftUpperBound)) - - (bounds, remainCondition) + if (timePreds.head.leftInputOnLeftSide) { --- End diff -- Please replace the condition as follows: ``` val bounds = if (timePreds.head.leftInputOnLeftSide) { Some(WindowBounds( timePreds.head.isEventTime, leftLowerBound, leftUpperBound, timePreds.head.leftTimeIdx, timePreds.head.rightTimeIdx)) } else { Some(WindowBounds( timePreds.head.isEventTime, leftLowerBound, leftUpperBound, timePreds.head.rightTimeIdx, timePreds.head.leftTimeIdx)) } (bounds, remainCondition) ``` We should not get the right index from the second time predicate because the tables might be accessed in inverse order there (we would need to check `leftInputOnLeftSide` again).
---