[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311794#comment-16311794 ]
ASF GitHub Bot commented on FLINK-7797: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159719430 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala --- @@ -183,23 +190,48 @@ class DataStreamWindowJoin( } } - def createEmptyJoin( + def createNegativeWindowSizeJoin( --- End diff -- I think we can make this even more efficient if we implement this as: ``` def createNegativeWindowSizeJoin( joinType: JoinType, leftInput: DataStream[CRow], rightInput: DataStream[CRow], leftArity: Int, rightArity: Int, returnType: TypeInformation[CRow]): DataStream[CRow] = { // we filter all records instead of adding an empty source to preserve the watermarks val allFilter = new FlatMapFunction[CRow, CRow] with ResultTypeQueryable[CRow] { override def flatMap(value: CRow, out: Collector[CRow]): Unit = { } override def getProducedType: TypeInformation[CRow] = returnType } val leftPadder = new MapFunction[CRow, CRow] with ResultTypeQueryable[CRow] { val paddingUtil = new OuterJoinPaddingUtil(leftArity, rightArity) override def map(value: CRow): CRow = new CRow(paddingUtil.padLeft(value.row), true) override def getProducedType: TypeInformation[CRow] = returnType } val rightPadder = new MapFunction[CRow, CRow] with ResultTypeQueryable[CRow] { val paddingUtil = new OuterJoinPaddingUtil(leftArity, rightArity) override def map(value: CRow): CRow = new CRow(paddingUtil.padRight(value.row), true) override def getProducedType: TypeInformation[CRow] = returnType } val leftP = leftInput.getParallelism val rightP = rightInput.getParallelism joinType match { case JoinType.INNER => leftInput.flatMap(allFilter).name("Empty Inner Join").setParallelism(leftP) .union(rightInput.flatMap(allFilter).name("Empty Inner Join").setParallelism(rightP)) case JoinType.LEFT_OUTER => leftInput.map(leftPadder).name("Left Outer Join").setParallelism(leftP) .union(rightInput.flatMap(allFilter).name("Left Outer Join").setParallelism(rightP)) case JoinType.RIGHT_OUTER => leftInput.flatMap(allFilter).name("Right Outer Join").setParallelism(leftP) .union(rightInput.map(rightPadder).name("Right Outer Join").setParallelism(rightP)) case JoinType.FULL_OUTER => leftInput.map(leftPadder).name("Full Outer Join").setParallelism(leftP) .union(rightInput.map(rightPadder).name("Full Outer Join").setParallelism(rightP)) } } ``` We also need to make `OuterJoinPaddingUtil` extend `java.io.Serializable` for this. > Add support for windowed outer joins for streaming tables > --------------------------------------------------------- > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Affects Versions: 1.4.0 > Reporter: Fabian Hueske > Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)