Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5140#discussion_r159023858
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
    @@ -142,50 +143,47 @@ class DataStreamWindowJoin(
             s"${joinConditionToString(schema.relDataType, joinCondition, 
getExpressionString)}), " +
             s"join: (${joinSelectionToString(schema.relDataType)})"
     
    -    joinType match {
    -      case JoinRelType.INNER =>
    -        if (relativeWindowSize < 0) {
    -          LOG.warn(s"The relative window size $relativeWindowSize is 
negative," +
    -            " please check the join conditions.")
    -          createEmptyInnerJoin(leftDataStream, rightDataStream, 
returnTypeInfo)
    -        } else {
    -          if (isRowTime) {
    -            createRowTimeInnerJoin(
    -              leftDataStream,
    -              rightDataStream,
    -              returnTypeInfo,
    -              joinOpName,
    -              joinFunction.name,
    -              joinFunction.code,
    -              leftKeys,
    -              rightKeys
    -            )
    -          } else {
    -            createProcTimeInnerJoin(
    -              leftDataStream,
    -              rightDataStream,
    -              returnTypeInfo,
    -              joinOpName,
    -              joinFunction.name,
    -              joinFunction.code,
    -              leftKeys,
    -              rightKeys
    -            )
    -          }
    -        }
    -      case JoinRelType.FULL =>
    -        throw new TableException(
    -          "Full join between stream and stream is not supported yet.")
    -      case JoinRelType.LEFT =>
    -        throw new TableException(
    -          "Left join between stream and stream is not supported yet.")
    -      case JoinRelType.RIGHT =>
    -        throw new TableException(
    -          "Right join between stream and stream is not supported yet.")
    +    val flinkJoinType = joinType match {
    +      case JoinRelType.INNER => JoinType.INNER
    +      case JoinRelType.FULL => JoinType.FULL_OUTER
    +      case JoinRelType.LEFT => JoinType.LEFT_OUTER
    +      case JoinRelType.RIGHT => JoinType.RIGHT_OUTER
    +    }
    +
    +    if (relativeWindowSize < 0) {
    +      LOG.warn(s"The relative window size $relativeWindowSize is 
negative," +
    +        " please check the join conditions.")
    +      createEmptyJoin(leftDataStream, rightDataStream, returnTypeInfo)
    --- End diff --
    
    Yes, your are right. I'll add this part.


---

Reply via email to