Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139435309 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", - BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( + timeIndicator + "InnerJoinLeftCache", + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], + leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", - BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( + timeIndicator + "InnerJoinRightCache", + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], + rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", - classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", - classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** - * Process records from the left stream. - * - * @param cRowValue the input record - * @param ctx the context to register timer or get current time - * @param out the collector for outputting results - * + * Process rows from the left stream. */ override def processElement1( - cRowValue: CRow, - ctx: CoProcessFunction[CRow, CRow, CRow]#Context, - out: Collector[CRow]): Unit = { - val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) - getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { + updateOperatorTime(ctx) + val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) + val oppositeLowerBound: Long = rowTime - rightRelativeSize + val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** - * Process records from the right stream. - * - * @param cRowValue the input record - * @param ctx the context to get current time - * @param out the collector for outputting results - * + * Process rows from the right stream. */ override def processElement2( - cRowValue: CRow, - ctx: CoProcessFunction[CRow, CRow, CRow]#Context, - out: Collector[CRow]): Unit = { - val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) - getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { + updateOperatorTime(ctx) + val rowTime: Long = getTimeForRightStream(ctx, cRowValue) + val oppositeLowerBound: Long = rowTime - leftRelativeSize + val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime, leftTimerState, rightCache, leftCache, - false + leftRow = false ) } /** - * Put a record from the input stream into the cache and iterate the opposite cache to - * output records meeting the join conditions. If there is no timer set for the OPPOSITE + * Put a row from the input stream into the cache and iterate the opposite cache to + * output join results meeting the conditions. If there is no timer set for the OPPOSITE * STREAM, register one. */ private def processElement( - cRowValue: CRow, - timeForRecord: Long, - ctx: CoProcessFunction[CRow, CRow, CRow]#Context, - out: Collector[CRow], - myWatermark: Long, - oppositeWatermark: Long, - oppositeTimeState: ValueState[Long], - recordListCache: MapState[Long, JList[Row]], - oppositeCache: MapState[Long, JList[Row]], - leftRecord: Boolean): Unit = { - if (relativeWindowSize > 0) { - //TODO Shall we consider adding a method for initialization with the context and collector? - cRowWrapper.out = out - - val record = cRowValue.row - - //TODO Only if the time of the record is greater than the watermark, can we continue. - if (timeForRecord >= myWatermark - allowedLateness) { - val oppositeLowerBound: Long = - if (leftRecord) timeForRecord - rightRelativeSize else timeForRecord - leftRelativeSize - - val oppositeUpperBound: Long = - if (leftRecord) timeForRecord + leftRelativeSize else timeForRecord + rightRelativeSize - - // Put the record into the cache for later use. - val recordList = if (recordListCache.contains(timeForRecord)) { - recordListCache.get(timeForRecord) - } else { - new util.ArrayList[Row]() - } - recordList.add(record) - recordListCache.put(timeForRecord, recordList) - - // Register a timer on THE OTHER STREAM to remove records from the cache once they are - // expired. - if (oppositeTimeState.value == 0) { - registerCleanUpTimer( - ctx, timeForRecord, oppositeWatermark, oppositeTimeState, leftRecord, true) - } + cRowValue: CRow, + timeForRow: Long, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + myWatermark: Long, + oppositeLowerBound: Long, + oppositeUpperBound: Long, + oppositeWatermark: Long, + oppositeTimeState: ValueState[Long], + rowListCache: MapState[Long, JList[Row]], + oppositeCache: MapState[Long, JList[Row]], + leftRow: Boolean): Unit = { + cRowWrapper.out = out + val row = cRowValue.row + if (!checkRowOutOfDate(timeForRow, myWatermark)) { + // Put the row into the cache for later use. + var rowList = rowListCache.get(timeForRow) + if (null == rowList) { + rowList = new ArrayList[Row](1) + } + rowList.add(row) + rowListCache.put(timeForRow, rowList) + // Register a timer on THE OPPOSITE STREAM to remove rows from the cache once they are + // expired. + if (oppositeTimeState.value == 0) { + registerCleanUpTimer( + ctx, timeForRow, oppositeWatermark, oppositeTimeState, leftRow, firstTimer = true) + } - // Join the record with records from the opposite stream. - val oppositeIterator = oppositeCache.iterator() - var oppositeEntry: Entry[Long, util.List[Row]] = null - var oppositeTime: Long = 0L; - while (oppositeIterator.hasNext) { - oppositeEntry = oppositeIterator.next - oppositeTime = oppositeEntry.getKey - if (oppositeTime < oppositeLowerBound - allowedLateness) { - //TODO Considering the data out-of-order, we should not remove records here. - } else if (oppositeTime >= oppositeLowerBound && oppositeTime <= oppositeUpperBound) { - val oppositeRows = oppositeEntry.getValue - var i = 0 - if (leftRecord) { - while (i < oppositeRows.size) { - joinFunction.join(record, oppositeRows.get(i), cRowWrapper) - i += 1 - } - } else { - while (i < oppositeRows.size) { - joinFunction.join(oppositeRows.get(i), record, cRowWrapper) - i += 1 - } + // Join the row with rows from the opposite stream. + val oppositeIterator = oppositeCache.iterator() + while (oppositeIterator.hasNext) { + val oppositeEntry = oppositeIterator.next + val oppositeTime = oppositeEntry.getKey + if (oppositeTime >= oppositeLowerBound && oppositeTime <= oppositeUpperBound) { + val oppositeRows = oppositeEntry.getValue + var i = 0 + if (leftRow) { + while (i < oppositeRows.size) { + joinFunction.join(row, oppositeRows.get(i), cRowWrapper) + i += 1 + } + } else { + while (i < oppositeRows.size) { + joinFunction.join(oppositeRows.get(i), row, cRowWrapper) + i += 1 } - } else if (oppositeTime > oppositeUpperBound) { - //TODO If the keys are ordered, can we break here? } } - } else { - //TODO Need some extra logic here? - LOG.warn(s"$record is out-of-date.") + // We could do the short-cutting optimization here once we get a state with ordered keys. } } + // We need to deal with the late data in the future. } /** - * Register a timer for cleaning up records in a specified time. + * Register a timer for cleaning up rows in a specified time. * * @param ctx the context to register timer - * @param timeForRecord time for the input record + * @param rowTime time for the input row * @param oppositeWatermark watermark of the opposite stream * @param timerState stores the timestamp for the next timer - * @param leftRecord record from the left or the right stream + * @param leftRow whether this row comes from the left stream * @param firstTimer whether this is the first timer */ private def registerCleanUpTimer( - ctx: CoProcessFunction[CRow, CRow, CRow]#Context, - timeForRecord: Long, - oppositeWatermark: Long, - timerState: ValueState[Long], - leftRecord: Boolean, - firstTimer: Boolean): Unit = { - val cleanUpTime = timeForRecord + (if (leftRecord) leftRelativeSize else rightRelativeSize) + - allowedLateness + 1 - registerTimer(ctx, !leftRecord, cleanUpTime) - LOG.debug(s"Register a clean up timer on the ${if (leftRecord) "RIGHT" else "LEFT"} state:" - + s" timeForRecord = ${timeForRecord}, cleanUpTime = ${cleanUpTime}, oppositeWatermark = " + - s"${oppositeWatermark}") - timerState.update(cleanUpTime) - if (cleanUpTime <= oppositeWatermark + allowedLateness && firstTimer) { - backPressureSuggestion = - if (leftRecord) (oppositeWatermark + allowedLateness - cleanUpTime) - else -(oppositeWatermark + allowedLateness - cleanUpTime) - LOG.warn("The clean timer for the " + - s"${if (leftRecord) "left" else "right"}" + - s" stream is lower than ${if (leftRecord) "right" else "left"} watermark." + - s" requiredTime = ${formatTime(cleanUpTime)}, watermark = ${formatTime(oppositeWatermark)}," - + s"backPressureSuggestion = " + s"${backPressureSuggestion}.") + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + rowTime: Long, + oppositeWatermark: Long, + timerState: ValueState[Long], + leftRow: Boolean, + firstTimer: Boolean): Unit = { + val cleanupTime = if (leftRow) { + rowTime + leftRelativeSize + cleanupDelay + allowedLateness + 1 + } else { + rowTime + rightRelativeSize + cleanupDelay + allowedLateness + 1 } + registerTimer(ctx, !leftRow, cleanupTime) + LOG.debug(s"Register a clean up timer on the ${if (leftRow) "RIGHT" else "LEFT"} state:" + + s" timeForRow = ${rowTime}, cleanupTime should be ${cleanupTime - cleanupDelay}," + + s" but delayed to ${cleanupTime}," + + s" oppositeWatermark = ${oppositeWatermark}") + timerState.update(cleanupTime) + //if cleanupTime <= oppositeWatermark + allowedLateness && firstTimer, we may set the + // backPressureSuggestion = + // if (leftRow) (oppositeWatermark + allowedLateness - cleanupTime) + // else -(oppositeWatermark + allowedLateness - cleanupTime) } - /** * Called when a registered timer is fired. - * Remove records which are earlier than the expiration time, - * and register a new timer for the earliest remaining records. + * Remove rows whose timestamps are earlier than the expiration time, + * and register a new timer for the remaining rows. * * @param timestamp the timestamp of the timer * @param ctx the context to register timer or get current time * @param out the collector for returning result values */ override def onTimer( - timestamp: Long, - ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext, - out: Collector[CRow]): Unit = { - getCurrentOperatorTime(ctx) - //TODO In the future, we should separate the left and right watermarks. Otherwise, the - //TODO registered timer of the faster stream will be delayed, even if the watermarks have - //TODO already been emitted by the source. + timestamp: Long, + ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext, + out: Collector[CRow]): Unit = { + updateOperatorTime(ctx) + // In the future, we should separate the left and right watermarks. Otherwise, the + // registered timer of the faster stream will be delayed, even if the watermarks have + // already been emitted by the source. if (leftTimerState.value == timestamp) { val rightExpirationTime = leftOperatorTime - rightRelativeSize - allowedLateness - 1 - removeExpiredRecords( - timestamp, + removeExpiredRows( rightExpirationTime, leftOperatorTime, rightCache, leftTimerState, ctx, - false + removeLeft = false ) } if (rightTimerState.value == timestamp) { val leftExpirationTime = rightOperatorTime - leftRelativeSize - allowedLateness - 1 - removeExpiredRecords( - timestamp, + removeExpiredRows( leftExpirationTime, rightOperatorTime, leftCache, rightTimerState, ctx, - true + removeLeft = true ) } } /** - * Remove the expired records. Register a new timer if the cache still holds records + * Remove the expired rows. Register a new timer if the cache still holds valid rows * after the cleaning up. + * + * @param expirationTime the expiration time for this cache + * @param oppositeWatermark the watermark of the opposite stream + * @param rowCache the row cache + * @param timerState timer state for the opposite stream + * @param ctx the context to register the cleanup timer + * @param removeLeft whether to remove the left rows */ - private def removeExpiredRecords( - timerFiringTime: Long, - expirationTime: Long, - oppositeWatermark: Long, - recordCache: MapState[Long, JList[Row]], - timerState: ValueState[Long], - ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext, - removeLeft: Boolean): Unit = { + private def removeExpiredRows( + expirationTime: Long, + oppositeWatermark: Long, + rowCache: MapState[Long, JList[Row]], + timerState: ValueState[Long], + ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext, + removeLeft: Boolean): Unit = { - val keysIterator = recordCache.keys().iterator() + val keysIterator = rowCache.keys().iterator() // Search for expired timestamps. // If we find a non-expired timestamp, remember the timestamp and leave the loop. // This way we find all expired timestamps if they are sorted without doing a full pass. var earliestTimestamp: Long = -1L - var recordTime: Long = 0L + var rowTime: Long = 0L while (keysIterator.hasNext) { - //TODO The "short-circuit" code was commented, because when using a StateMap with - //TODO unordered keys, the cache will grow indefinitely! - // && earliestTimestamp < 0) { - recordTime = keysIterator.next - if (recordTime <= expirationTime) { - // TODO Not sure if we can remove records directly. + rowTime = keysIterator.next + if (rowTime <= expirationTime) { keysIterator.remove() } else { // We find the earliest timestamp that is still valid. - if (recordTime < earliestTimestamp || earliestTimestamp < 0) { - earliestTimestamp = recordTime + if (rowTime < earliestTimestamp || earliestTimestamp < 0) { + earliestTimestamp = rowTime } } } // If the cache contains non-expired timestamps, register a new timer. // Otherwise clear the states. if (earliestTimestamp > 0) { - registerCleanUpTimer(ctx, earliestTimestamp, oppositeWatermark, timerState, removeLeft, false) + registerCleanUpTimer( + ctx, + earliestTimestamp, + oppositeWatermark, + timerState, + removeLeft, + firstTimer = false) } else { // The timerState will be 0. timerState.clear() - recordCache.clear() + rowCache.clear() } } /** - * Get the operator times of the two streams. + * Check if the row is out of date. + * + * @param timeForRow time of the row + * @param watermark watermark for the stream + * @return true if the row is out of date; false otherwise + */ + def checkRowOutOfDate(timeForRow: Long, watermark: Long): Boolean --- End diff -- rename method to `isRowTooLate()`?
---