Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r143400052 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -184,4 +229,50 @@ class DataStreamWindowJoin( .returns(returnTypeInfo) } } + + def createRowTimeInnerJoin( + leftDataStream: DataStream[CRow], + rightDataStream: DataStream[CRow], + returnTypeInfo: TypeInformation[CRow], + joinFunctionName: String, + joinFunctionCode: String, + leftKeys: Array[Int], + rightKeys: Array[Int]): DataStream[CRow] = { + + val rowTimeInnerJoinFunc = new RowTimeBoundedStreamInnerJoin( + leftLowerBound, + leftUpperBound, + allowedLateness = 0L, + leftSchema.typeInfo, + rightSchema.typeInfo, + joinFunctionName, + joinFunctionCode, + leftTimeIdx, + rightTimeIdx) + + if (!leftKeys.isEmpty) { + leftDataStream + .connect(rightDataStream) + .keyBy(leftKeys, rightKeys) --- End diff -- That's a good point. The motivation for this restriction is to prevent nested-loop joins for the batch execution. In the streaming window join case, no equi-join keys would result in a single thread execution which is not efficient either (could be parallelized by broadcasting one of the inputs but without statistics this is quite risky) but not as bad as a full nested-loop join because we can bound the computation due to the window predicates. We could add a boolean flag to the constructor of `FlinkLogicalJoinConverter` to allow cross joins. Right now the set of logical optimization rules is shared by DataSet and DataStream. I'd keep the joint rule set for now and just add an additional rule by overriding `getLogicalOptRuleSet` in `StreamExecutionEnvironment`.
---