Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r141993041 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", - BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( + timeIndicator + "InnerJoinLeftCache", + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], + leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", - BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( + timeIndicator + "InnerJoinRightCache", + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], + rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", - classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", - classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** - * Process records from the left stream. - * - * @param cRowValue the input record - * @param ctx the context to register timer or get current time - * @param out the collector for outputting results - * + * Process rows from the left stream. */ override def processElement1( - cRowValue: CRow, - ctx: CoProcessFunction[CRow, CRow, CRow]#Context, - out: Collector[CRow]): Unit = { - val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) - getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { + updateOperatorTime(ctx) + val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) + val oppositeLowerBound: Long = rowTime - rightRelativeSize + val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** - * Process records from the right stream. - * - * @param cRowValue the input record - * @param ctx the context to get current time - * @param out the collector for outputting results - * + * Process rows from the right stream. */ override def processElement2( - cRowValue: CRow, - ctx: CoProcessFunction[CRow, CRow, CRow]#Context, - out: Collector[CRow]): Unit = { - val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) - getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { + updateOperatorTime(ctx) + val rowTime: Long = getTimeForRightStream(ctx, cRowValue) + val oppositeLowerBound: Long = rowTime - leftRelativeSize + val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime, leftTimerState, rightCache, leftCache, - false + leftRow = false ) } /** - * Put a record from the input stream into the cache and iterate the opposite cache to - * output records meeting the join conditions. If there is no timer set for the OPPOSITE + * Put a row from the input stream into the cache and iterate the opposite cache to + * output join results meeting the conditions. If there is no timer set for the OPPOSITE * STREAM, register one. */ private def processElement( - cRowValue: CRow, - timeForRecord: Long, - ctx: CoProcessFunction[CRow, CRow, CRow]#Context, - out: Collector[CRow], - myWatermark: Long, - oppositeWatermark: Long, - oppositeTimeState: ValueState[Long], - recordListCache: MapState[Long, JList[Row]], - oppositeCache: MapState[Long, JList[Row]], - leftRecord: Boolean): Unit = { - if (relativeWindowSize > 0) { - //TODO Shall we consider adding a method for initialization with the context and collector? - cRowWrapper.out = out - - val record = cRowValue.row - - //TODO Only if the time of the record is greater than the watermark, can we continue. - if (timeForRecord >= myWatermark - allowedLateness) { - val oppositeLowerBound: Long = - if (leftRecord) timeForRecord - rightRelativeSize else timeForRecord - leftRelativeSize - - val oppositeUpperBound: Long = - if (leftRecord) timeForRecord + leftRelativeSize else timeForRecord + rightRelativeSize - - // Put the record into the cache for later use. - val recordList = if (recordListCache.contains(timeForRecord)) { - recordListCache.get(timeForRecord) - } else { - new util.ArrayList[Row]() - } - recordList.add(record) - recordListCache.put(timeForRecord, recordList) - - // Register a timer on THE OTHER STREAM to remove records from the cache once they are - // expired. - if (oppositeTimeState.value == 0) { - registerCleanUpTimer( - ctx, timeForRecord, oppositeWatermark, oppositeTimeState, leftRecord, true) - } + cRowValue: CRow, + timeForRow: Long, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + myWatermark: Long, + oppositeLowerBound: Long, + oppositeUpperBound: Long, + oppositeWatermark: Long, + oppositeTimeState: ValueState[Long], + rowListCache: MapState[Long, JList[Row]], + oppositeCache: MapState[Long, JList[Row]], + leftRow: Boolean): Unit = { + cRowWrapper.out = out + val row = cRowValue.row + if (!checkRowOutOfDate(timeForRow, myWatermark)) { --- End diff -- Well, that makes sense to me. To achieve it, I think we should compare the **real lowest timestamp** in the opposite cache with the `oppositeQualifiedUpperBound`, i.e., ``` if (upperWindowBound > otherOpTime - allowedLateness) { // store record } if (upperWindowBound > realLowestTimestamp) { // join record } ``` I'll update the implementation and the tests.
---