Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4625#discussion_r143400052
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
    @@ -184,4 +229,50 @@ class DataStreamWindowJoin(
             .returns(returnTypeInfo)
         }
       }
    +
    +  def createRowTimeInnerJoin(
    +      leftDataStream: DataStream[CRow],
    +      rightDataStream: DataStream[CRow],
    +      returnTypeInfo: TypeInformation[CRow],
    +      joinFunctionName: String,
    +      joinFunctionCode: String,
    +      leftKeys: Array[Int],
    +      rightKeys: Array[Int]): DataStream[CRow] = {
    +
    +    val rowTimeInnerJoinFunc = new RowTimeBoundedStreamInnerJoin(
    +      leftLowerBound,
    +      leftUpperBound,
    +      allowedLateness = 0L,
    +      leftSchema.typeInfo,
    +      rightSchema.typeInfo,
    +      joinFunctionName,
    +      joinFunctionCode,
    +      leftTimeIdx,
    +      rightTimeIdx)
    +
    +    if (!leftKeys.isEmpty) {
    +      leftDataStream
    +        .connect(rightDataStream)
    +        .keyBy(leftKeys, rightKeys)
    --- End diff --
    
    That's a good point. The motivation for this restriction is to prevent 
nested-loop joins for the batch execution. 
    In the streaming window join case, no equi-join keys would result in a 
single thread execution which is not efficient either (could be parallelized by 
broadcasting one of the inputs but without statistics this is quite risky) but 
not as bad as a full nested-loop join because we can bound the computation due 
to the window predicates.
    
    We could add a boolean flag to the constructor of 
`FlinkLogicalJoinConverter` to allow cross joins. Right now the set of logical 
optimization rules is shared by DataSet and DataStream. I'd keep the joint rule 
set for now and just add an additional rule by overriding 
`getLogicalOptRuleSet` in `StreamExecutionEnvironment`. 


---

Reply via email to