[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174815#comment-16174815 ]
ASF GitHub Bot commented on FLINK-6233: --------------------------------------- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140251765 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", - BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( + timeIndicator + "InnerJoinLeftCache", + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], + leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", - BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( + timeIndicator + "InnerJoinRightCache", + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], + rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", - classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", - classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** - * Process records from the left stream. - * - * @param cRowValue the input record - * @param ctx the context to register timer or get current time - * @param out the collector for outputting results - * + * Process rows from the left stream. */ override def processElement1( - cRowValue: CRow, - ctx: CoProcessFunction[CRow, CRow, CRow]#Context, - out: Collector[CRow]): Unit = { - val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) - getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { + updateOperatorTime(ctx) + val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) + val oppositeLowerBound: Long = rowTime - rightRelativeSize + val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** - * Process records from the right stream. - * - * @param cRowValue the input record - * @param ctx the context to get current time - * @param out the collector for outputting results - * + * Process rows from the right stream. */ override def processElement2( - cRowValue: CRow, - ctx: CoProcessFunction[CRow, CRow, CRow]#Context, - out: Collector[CRow]): Unit = { - val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) - getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { + updateOperatorTime(ctx) + val rowTime: Long = getTimeForRightStream(ctx, cRowValue) + val oppositeLowerBound: Long = rowTime - leftRelativeSize + val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime, leftTimerState, rightCache, leftCache, - false + leftRow = false ) } /** - * Put a record from the input stream into the cache and iterate the opposite cache to - * output records meeting the join conditions. If there is no timer set for the OPPOSITE + * Put a row from the input stream into the cache and iterate the opposite cache to + * output join results meeting the conditions. If there is no timer set for the OPPOSITE * STREAM, register one. */ private def processElement( - cRowValue: CRow, - timeForRecord: Long, - ctx: CoProcessFunction[CRow, CRow, CRow]#Context, - out: Collector[CRow], - myWatermark: Long, - oppositeWatermark: Long, - oppositeTimeState: ValueState[Long], - recordListCache: MapState[Long, JList[Row]], - oppositeCache: MapState[Long, JList[Row]], - leftRecord: Boolean): Unit = { - if (relativeWindowSize > 0) { - //TODO Shall we consider adding a method for initialization with the context and collector? - cRowWrapper.out = out - - val record = cRowValue.row - - //TODO Only if the time of the record is greater than the watermark, can we continue. - if (timeForRecord >= myWatermark - allowedLateness) { - val oppositeLowerBound: Long = - if (leftRecord) timeForRecord - rightRelativeSize else timeForRecord - leftRelativeSize - - val oppositeUpperBound: Long = - if (leftRecord) timeForRecord + leftRelativeSize else timeForRecord + rightRelativeSize - - // Put the record into the cache for later use. - val recordList = if (recordListCache.contains(timeForRecord)) { - recordListCache.get(timeForRecord) - } else { - new util.ArrayList[Row]() - } - recordList.add(record) - recordListCache.put(timeForRecord, recordList) - - // Register a timer on THE OTHER STREAM to remove records from the cache once they are - // expired. - if (oppositeTimeState.value == 0) { - registerCleanUpTimer( - ctx, timeForRecord, oppositeWatermark, oppositeTimeState, leftRecord, true) - } + cRowValue: CRow, + timeForRow: Long, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + myWatermark: Long, + oppositeLowerBound: Long, + oppositeUpperBound: Long, + oppositeWatermark: Long, + oppositeTimeState: ValueState[Long], + rowListCache: MapState[Long, JList[Row]], + oppositeCache: MapState[Long, JList[Row]], + leftRow: Boolean): Unit = { + cRowWrapper.out = out + val row = cRowValue.row + if (!checkRowOutOfDate(timeForRow, myWatermark)) { --- End diff -- Yes, we must include the `allowedLateness` into all the conditions. However, I think the "storing condition" should be covered by the "lateness condition", i.e., ``` if (myRecord > myOpTime - allowedLateness) { if (upperWindowBound > otherOpTime - allowedLateness) { // store record } // join record } ``` That's because once we store a record `R` which doesn't pass the lateness check, it may be joined with a later coming record form the opposite stream. Now that `R` isn't joined when it comes, to keep the completeness, it should never be joined. > Support rowtime inner equi-join between two streams in the SQL API > ------------------------------------------------------------------ > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: hongyuhong > Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime < s.rowtime}} , and should include both two > stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + > 1}} should also not be supported. > An row-time streams join will not be able to handle late data, because this > would mean in insert a row into a sorted order shift all other computations. > This would be too expensive to maintain. Therefore, we will throw an error if > a user tries to use an row-time stream join with late data handling. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.4.14#64029)