Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r137225205 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala --- @@ -55,8 +55,10 @@ class DataStreamWindowJoinRule if (windowBounds.isDefined) { if (windowBounds.get.isEventTime) { - // we cannot handle event-time window joins yet - false + val procTimeAttrInOutput = join.getRowType.getFieldList.asScala + .exists(f => FlinkTypeFactory.isProctimeIndicatorType(f.getType)) + + !remainingPredsAccessTime && !procTimeAttrInOutput --- End diff -- No, the problem is that these rowtime attributes after a proc-time join won't be aligned with the watermarks anymore. We would need to hold back watermarks based on the data in the caches and not based on the window boundaries. Keeping the proctime attributes is fine because they are not bound to watermarks.
---