Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995707 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -176,14 +179,34 @@ class DataStreamJoin( body, returnType) - val coMapFun = - new NonWindowInnerJoin( - leftSchema.typeInfo, - rightSchema.typeInfo, - CRowTypeInfo(returnType), - genFunction.name, - genFunction.code, - queryConfig) + val coMapFun = joinType match { + case JoinRelType.INNER => + new NonWindowInnerJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + queryConfig) + case JoinRelType.LEFT if joinInfo.isEqui => + new NonWindowLeftRightJoin( + leftSchema.typeInfo, + rightSchema.typeInfo, + CRowTypeInfo(returnType), + genFunction.name, + genFunction.code, + joinType == JoinRelType.LEFT, + queryConfig) + case JoinRelType.LEFT => --- End diff -- I planed to add right join in FLINK-8429. It's ok to add right join in this pr if you prefer.
---