Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r188531785 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -176,14 +179,34 @@ class DataStreamJoin( body, returnType) - val coMapFun = - new NonWindowInnerJoin( - leftSchema.typeInfo, - rightSchema.typeInfo, - CRowTypeInfo(returnType), - genFunction.name, - genFunction.code, - queryConfig) + val coMapFun = joinType match { + case JoinRelType.INNER => + new NonWindowInnerJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + queryConfig) + case JoinRelType.LEFT if joinInfo.isEqui => + new NonWindowLeftRightJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + joinType == JoinRelType.LEFT, + queryConfig) + case JoinRelType.LEFT => --- End diff -- We can also do it as part of FLINK-8429.
---