Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4625#discussion_r137225205
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
 ---
    @@ -55,8 +55,10 @@ class DataStreamWindowJoinRule
     
         if (windowBounds.isDefined) {
           if (windowBounds.get.isEventTime) {
    -        // we cannot handle event-time window joins yet
    -        false
    +        val procTimeAttrInOutput = join.getRowType.getFieldList.asScala
    +          .exists(f => FlinkTypeFactory.isProctimeIndicatorType(f.getType))
    +
    +        !remainingPredsAccessTime && !procTimeAttrInOutput
    --- End diff --
    
    No, the problem is that these rowtime attributes after a proc-time join 
won't be aligned with the watermarks anymore. We would need to hold back 
watermarks based on the data in the caches and not based on the window 
boundaries. 
    
    Keeping the proctime attributes is fine because they are not bound to 
watermarks.


---

Reply via email to