asfgit closed pull request #6918: [FLINK-10543][table] Leverage efficient timer deletion in relational operators URL: https://github.com/apache/flink/pull/6918
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CleanupState.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CleanupState.scala new file mode 100644 index 00000000000..d9c8e2ccaee --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CleanupState.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import java.lang.{Long => JLong} + +import org.apache.flink.streaming.api.TimerService + +/** + * Base class for clean up state, both for [[ProcessFunction]] and [[CoProcessFunction]]. + */ +trait CleanupState { + + def registerProcessingCleanupTimer( + cleanupTimeState: ValueState[JLong], + currentTime: Long, + minRetentionTime: Long, + maxRetentionTime: Long, + timerService: TimerService): Unit = { + + // last registered timer + val curCleanupTime = cleanupTimeState.value() + + // check if a cleanup timer is registered and + // that the current cleanup timer won't delete state we need to keep + if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) { + // we need to register a new (later) timer + val cleanupTime = currentTime + maxRetentionTime + // register timer and remember clean-up time + timerService.registerProcessingTimeTimer(cleanupTime) + // delete expired timer + if (curCleanupTime != null) { + timerService.deleteProcessingTimeTimer(curCleanupTime) + } + cleanupTimeState.update(cleanupTime) + } + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CoProcessFunctionWithCleanupState.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CoProcessFunctionWithCleanupState.scala new file mode 100644 index 00000000000..0c7663621ba --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/CoProcessFunctionWithCleanupState.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.{Long => JLong} + +import org.apache.flink.api.common.state.{State, ValueState, ValueStateDescriptor} +import org.apache.flink.streaming.api.TimeDomain +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} + +abstract class CoProcessFunctionWithCleanupState[IN1, IN2, OUT](queryConfig: StreamQueryConfig) + extends CoProcessFunction[IN1, IN2, OUT] + with CleanupState { + + protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + // holds the latest registered cleanup timer + private var cleanupTimeState: ValueState[JLong] = _ + + protected def initCleanupTimeState(stateName: String) { + if (stateCleaningEnabled) { + val cleanupTimeDescriptor: ValueStateDescriptor[JLong] = + new ValueStateDescriptor[JLong](stateName, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(cleanupTimeDescriptor) + } + } + + protected def processCleanupTimer( + ctx: CoProcessFunction[IN1, IN2, OUT]#Context, + currentTime: Long): Unit = { + if (stateCleaningEnabled) { + registerProcessingCleanupTimer( + cleanupTimeState, + currentTime, + minRetentionTime, + maxRetentionTime, + ctx.timerService() + ) + } + } + + protected def isProcessingTimeTimer(ctx: OnTimerContext): Boolean = { + ctx.timeDomain() == TimeDomain.PROCESSING_TIME + } + + protected def cleanupState(states: State*): Unit = { + // clear all state + states.foreach(_.clear()) + this.cleanupTimeState.clear() + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala index c59efe250c3..2d72e6de961 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala @@ -86,7 +86,7 @@ class GroupAggProcessFunction( val currentTime = ctx.timerService().currentProcessingTime() // register state-cleanup timer - registerProcessingCleanupTimer(ctx, currentTime) + processCleanupTimer(ctx, currentTime) val input = inputC.row @@ -172,7 +172,7 @@ class GroupAggProcessFunction( ctx: ProcessFunction[CRow, CRow]#OnTimerContext, out: Collector[CRow]): Unit = { - if (needToCleanupState(timestamp)) { + if (stateCleaningEnabled) { cleanupState(state, cntState) function.cleanup() } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala index 4d6840a3f43..edf5c2cd101 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala @@ -25,13 +25,15 @@ import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFu import org.apache.flink.table.api.{StreamQueryConfig, Types} abstract class KeyedProcessFunctionWithCleanupState[K, I, O](queryConfig: StreamQueryConfig) - extends KeyedProcessFunction[K, I, O] { + extends KeyedProcessFunction[K, I, O] + with CleanupState { + protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime protected val stateCleaningEnabled: Boolean = minRetentionTime > 1 // holds the latest registered cleanup timer - private var cleanupTimeState: ValueState[JLong] = _ + protected var cleanupTimeState: ValueState[JLong] = _ protected def initCleanupTimeState(stateName: String) { if (stateCleaningEnabled) { @@ -41,23 +43,17 @@ abstract class KeyedProcessFunctionWithCleanupState[K, I, O](queryConfig: Stream } } - protected def registerProcessingCleanupTimer( + protected def processCleanupTimer( ctx: KeyedProcessFunction[K, I, O]#Context, currentTime: Long): Unit = { if (stateCleaningEnabled) { - - // last registered timer - val curCleanupTime = cleanupTimeState.value() - - // check if a cleanup timer is registered and - // that the current cleanup timer won't delete state we need to keep - if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) { - // we need to register a new (later) timer - val cleanupTime = currentTime + maxRetentionTime - // register timer and remember clean-up time - ctx.timerService().registerProcessingTimeTimer(cleanupTime) - cleanupTimeState.update(cleanupTime) - } + registerProcessingCleanupTimer( + cleanupTimeState, + currentTime, + minRetentionTime, + maxRetentionTime, + ctx.timerService() + ) } } @@ -65,16 +61,6 @@ abstract class KeyedProcessFunctionWithCleanupState[K, I, O](queryConfig: Stream ctx.timeDomain() == TimeDomain.PROCESSING_TIME } - protected def needToCleanupState(timestamp: Long): Boolean = { - if (stateCleaningEnabled) { - val cleanupTime = cleanupTimeState.value() - // check that the triggered timer is the last registered processing time timer. - null != cleanupTime && timestamp == cleanupTime - } else { - false - } - } - protected def cleanupState(states: State*): Unit = { // clear all state states.foreach(_.clear()) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala index 591b942571f..6126dc73b30 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala @@ -95,7 +95,7 @@ class ProcTimeBoundedRangeOver( val currentTime = ctx.timerService.currentProcessingTime // register state-cleanup timer - registerProcessingCleanupTimer(ctx, currentTime) + processCleanupTimer(ctx, currentTime) // buffer the event incoming event @@ -117,11 +117,14 @@ class ProcTimeBoundedRangeOver( ctx: ProcessFunction[CRow, CRow]#OnTimerContext, out: Collector[CRow]): Unit = { - if (needToCleanupState(timestamp)) { - // clean up and return - cleanupState(rowMapState, accumulatorState) - function.cleanup() - return + if (stateCleaningEnabled) { + val cleanupTime = cleanupTimeState.value() + if (null != cleanupTime && timestamp == cleanupTime) { + // clean up and return + cleanupState(rowMapState, accumulatorState) + function.cleanup() + return + } } // remove timestamp set outside of ProcessFunction. @@ -131,11 +134,10 @@ class ProcTimeBoundedRangeOver( // that have registered this time trigger 1 ms ago val currentTime = timestamp - 1 - var i = 0 // get the list of elements of current proctime val currentElements = rowMapState.get(currentTime) - // Expired clean-up timers pass the needToCleanupState() check. + // Expired clean-up timers pass the needToCleanupState check. // Perform a null check to verify that we have data to process. if (null == currentElements) { return @@ -156,7 +158,6 @@ class ProcTimeBoundedRangeOver( // and eliminate them. Multiple elements could have been received at the same timestamp // the removal of old elements happens only once per proctime as onTimer is called only once val iter = rowMapState.iterator - val markToRemove = new ArrayList[Long]() while (iter.hasNext) { val entry = iter.next() val elementKey = entry.getKey @@ -169,17 +170,9 @@ class ProcTimeBoundedRangeOver( function.retract(accumulators, retractRow) iRemove += 1 } - // mark element for later removal not to modify the iterator over MapState - markToRemove.add(elementKey) + iter.remove() } } - // need to remove in 2 steps not to have concurrent access errors via iterator to the MapState - i = 0 - while (i < markToRemove.size()) { - rowMapState.remove(markToRemove.get(i)) - i += 1 - } - // add current elements to aggregator. Multiple elements might // have arrived in the same proctime diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala index ccddaa5b10f..fa58ac50529 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala @@ -110,7 +110,7 @@ class ProcTimeBoundedRowsOver( val currentTime = ctx.timerService.currentProcessingTime // register state-cleanup timer - registerProcessingCleanupTimer(ctx, currentTime) + processCleanupTimer(ctx, currentTime) // initialize state for the processed element var accumulators = accumulatorState.value @@ -187,7 +187,7 @@ class ProcTimeBoundedRowsOver( ctx: ProcessFunction[CRow, CRow]#OnTimerContext, out: Collector[CRow]): Unit = { - if (needToCleanupState(timestamp)) { + if (stateCleaningEnabled) { cleanupState(rowMapState, accumulatorState, counterState, smallestTsState) function.cleanup() } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala index 6e4c5105786..ce1a95971a9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala @@ -71,7 +71,7 @@ class ProcTimeUnboundedOver( out: Collector[CRow]): Unit = { // register state-cleanup timer - registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) + processCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) val input = inputC.row @@ -95,7 +95,7 @@ class ProcTimeUnboundedOver( ctx: ProcessFunction[CRow, CRow]#OnTimerContext, out: Collector[CRow]): Unit = { - if (needToCleanupState(timestamp)) { + if (stateCleaningEnabled) { cleanupState(state) function.cleanup() } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala index 292fd3bdf1e..7263de72c4b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala @@ -26,40 +26,35 @@ import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.table.api.{StreamQueryConfig, Types} abstract class ProcessFunctionWithCleanupState[IN,OUT](queryConfig: StreamQueryConfig) - extends ProcessFunction[IN, OUT]{ + extends ProcessFunction[IN, OUT] + with CleanupState { protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime protected val stateCleaningEnabled: Boolean = minRetentionTime > 1 // holds the latest registered cleanup timer - private var cleanupTimeState: ValueState[JLong] = _ + protected var cleanupTimeState: ValueState[JLong] = _ protected def initCleanupTimeState(stateName: String) { if (stateCleaningEnabled) { - val inputCntDescriptor: ValueStateDescriptor[JLong] = + val cleanupTimeDescriptor: ValueStateDescriptor[JLong] = new ValueStateDescriptor[JLong](stateName, Types.LONG) - cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) + cleanupTimeState = getRuntimeContext.getState(cleanupTimeDescriptor) } } - protected def registerProcessingCleanupTimer( + protected def processCleanupTimer( ctx: ProcessFunction[IN, OUT]#Context, currentTime: Long): Unit = { if (stateCleaningEnabled) { - - // last registered timer - val curCleanupTime = cleanupTimeState.value() - - // check if a cleanup timer is registered and - // that the current cleanup timer won't delete state we need to keep - if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) { - // we need to register a new (later) timer - val cleanupTime = currentTime + maxRetentionTime - // register timer and remember clean-up time - ctx.timerService().registerProcessingTimeTimer(cleanupTime) - cleanupTimeState.update(cleanupTime) - } + registerProcessingCleanupTimer( + cleanupTimeState, + currentTime, + minRetentionTime, + maxRetentionTime, + ctx.timerService() + ) } } @@ -67,16 +62,6 @@ abstract class ProcessFunctionWithCleanupState[IN,OUT](queryConfig: StreamQueryC ctx.timeDomain() == TimeDomain.PROCESSING_TIME } - protected def needToCleanupState(timestamp: Long): Boolean = { - if (stateCleaningEnabled) { - val cleanupTime = cleanupTimeState.value() - // check that the triggered timer is the last registered processing time timer. - null != cleanupTime && timestamp == cleanupTime - } else { - false - } - } - protected def cleanupState(states: State*): Unit = { // clear all state states.foreach(_.clear()) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala index b13acdf43cc..7c509d699e8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala @@ -114,7 +114,7 @@ class RowTimeBoundedRangeOver( val input = inputC.row // register state-cleanup timer - registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) + processCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) // triggering timestamp for trigger calculation val triggeringTs = input.getField(rowTimeIdx).asInstanceOf[Long] @@ -143,7 +143,7 @@ class RowTimeBoundedRangeOver( out: Collector[CRow]): Unit = { if (isProcessingTimeTimer(ctx.asInstanceOf[OnTimerContext])) { - if (needToCleanupState(timestamp)) { + if (stateCleaningEnabled) { val keysIt = dataState.keys.iterator() val lastProcessedTime = lastTriggeringTsState.value @@ -164,7 +164,7 @@ class RowTimeBoundedRangeOver( // There are records left to process because a watermark has not been received yet. // This would only happen if the input stream has stopped. So we don't need to clean up. // We leave the state as it is and schedule a new cleanup timer - registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) + processCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) } } return @@ -188,9 +188,6 @@ class RowTimeBoundedRangeOver( aggregatesIndex = 0 } - // keep up timestamps of retract data - val retractTsList: JList[Long] = new JArrayList[Long] - // do retraction val iter = dataState.iterator() while (iter.hasNext) { @@ -205,7 +202,7 @@ class RowTimeBoundedRangeOver( function.retract(accumulators, retractRow) dataListIndex += 1 } - retractTsList.add(dataTs) + iter.remove() } } @@ -230,20 +227,13 @@ class RowTimeBoundedRangeOver( dataListIndex += 1 } - // remove the data that has been retracted - dataListIndex = 0 - while (dataListIndex < retractTsList.size) { - dataState.remove(retractTsList.get(dataListIndex)) - dataListIndex += 1 - } - // update state accumulatorState.update(accumulators) } lastTriggeringTsState.update(timestamp) // update cleanup timer - registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) + processCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) } override def close(): Unit = { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala index e120d6b0afd..d01a499e88e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala @@ -123,7 +123,7 @@ class RowTimeBoundedRowsOver( val input = inputC.row // register state-cleanup timer - registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) + processCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) // triggering timestamp for trigger calculation val triggeringTs = input.getField(rowTimeIdx).asInstanceOf[Long] @@ -152,7 +152,7 @@ class RowTimeBoundedRowsOver( out: Collector[CRow]): Unit = { if (isProcessingTimeTimer(ctx.asInstanceOf[OnTimerContext])) { - if (needToCleanupState(timestamp)) { + if (stateCleaningEnabled) { val keysIt = dataState.keys.iterator() val lastProcessedTime = lastTriggeringTsState.value @@ -173,7 +173,7 @@ class RowTimeBoundedRowsOver( // There are records left to process because a watermark has not been received yet. // This would only happen if the input stream has stopped. So we don't need to clean up. // We leave the state as it is and schedule a new cleanup timer - registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) + processCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) } } return @@ -263,7 +263,7 @@ class RowTimeBoundedRowsOver( lastTriggeringTsState.update(timestamp) // update cleanup timer - registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) + processCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) } override def close(): Unit = { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala index 181c7680a35..690d0d05ee3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala @@ -108,7 +108,7 @@ abstract class RowTimeUnboundedOver( val input = inputC.row // register state-cleanup timer - registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) + processCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) val timestamp = input.getField(rowTimeIdx).asInstanceOf[Long] val curWatermark = ctx.timerService().currentWatermark() @@ -143,7 +143,7 @@ abstract class RowTimeUnboundedOver( out: Collector[CRow]): Unit = { if (isProcessingTimeTimer(ctx.asInstanceOf[OnTimerContext])) { - if (needToCleanupState(timestamp)) { + if (stateCleaningEnabled) { // we check whether there are still records which have not been processed yet val noRecordsToProcess = !rowMapState.keys.iterator().hasNext @@ -155,7 +155,7 @@ abstract class RowTimeUnboundedOver( // There are records left to process because a watermark has not been received yet. // This would only happen if the input stream has stopped. So we don't need to clean up. // We leave the state as it is and schedule a new cleanup timer - registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) + processCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) } } return @@ -207,7 +207,7 @@ abstract class RowTimeUnboundedOver( } // update cleanup timer - registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) + processCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) } /** diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoin.scala index 57c60f179c6..5b1069ff28b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoin.scala @@ -66,13 +66,12 @@ class NonWindowFullJoin( value: CRow, ctx: CoProcessFunction[CRow, CRow, CRow]#Context, out: Collector[CRow], - timerState: ValueState[Long], currentSideState: MapState[Row, JTuple2[Long, Long]], otherSideState: MapState[Row, JTuple2[Long, Long]], recordFromLeft: Boolean): Unit = { val inputRow = value.row - updateCurrentSide(value, ctx, timerState, currentSideState) + updateCurrentSide(value, ctx, currentSideState) cRowWrapper.reset() cRowWrapper.setCollector(out) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoinWithNonEquiPredicates.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoinWithNonEquiPredicates.scala index 9c27eb461a6..0166eef7961 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoinWithNonEquiPredicates.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowFullJoinWithNonEquiPredicates.scala @@ -68,14 +68,13 @@ class NonWindowFullJoinWithNonEquiPredicates( value: CRow, ctx: CoProcessFunction[CRow, CRow, CRow]#Context, out: Collector[CRow], - timerState: ValueState[Long], currentSideState: MapState[Row, JTuple2[Long, Long]], otherSideState: MapState[Row, JTuple2[Long, Long]], recordFromLeft: Boolean): Unit = { val currentJoinCntState = getJoinCntState(joinCntState, recordFromLeft) val inputRow = value.row - val cntAndExpiredTime = updateCurrentSide(value, ctx, timerState, currentSideState) + val cntAndExpiredTime = updateCurrentSide(value, ctx, currentSideState) if (!value.change && cntAndExpiredTime.f0 <= 0) { currentJoinCntState.remove(inputRow) } @@ -99,18 +98,18 @@ class NonWindowFullJoinWithNonEquiPredicates( } /** - * Removes records which are expired from left state. Register a new timer if the state still - * holds records after the clean-up. Also, clear leftJoinCnt map state when clear left - * rowMapState. + * Called when a processing timer trigger. + * Expire left/right expired records and expired joinCnt state. */ - override def expireOutTimeRow( - curTime: Long, - rowMapState: MapState[Row, JTuple2[Long, Long]], - timerState: ValueState[Long], - isLeft: Boolean, - ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext): Unit = { + override def onTimer( + timestamp: Long, + ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext, + out: Collector[CRow]): Unit = { - expireOutTimeRow(curTime, rowMapState, timerState, isLeft, joinCntState, ctx) + // expired timer has already been removed, delete state directly. + if (stateCleaningEnabled) { + cleanupState(leftState, rightState, joinCntState(0), joinCntState(1)) + } } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala index 2e5832c2694..91a75077cd8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala @@ -63,13 +63,12 @@ class NonWindowInnerJoin( value: CRow, ctx: CoProcessFunction[CRow, CRow, CRow]#Context, out: Collector[CRow], - timerState: ValueState[Long], currentSideState: MapState[Row, JTuple2[Long, Long]], otherSideState: MapState[Row, JTuple2[Long, Long]], isLeft: Boolean): Unit = { val inputRow = value.row - updateCurrentSide(value, ctx, timerState, currentSideState) + updateCurrentSide(value, ctx, currentSideState) cRowWrapper.setCollector(out) cRowWrapper.setChange(value.change) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala index c59f4c2f44b..e15cbfa550c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.runtime.join import org.apache.flink.api.common.functions.FlatJoinFunction import org.apache.flink.api.common.functions.util.FunctionUtils -import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.state.{MapState, MapStateDescriptor} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.TupleTypeInfo @@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.co.CoProcessFunction import org.apache.flink.table.api.{StreamQueryConfig, Types} import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.aggregate.CoProcessFunctionWithCleanupState import org.apache.flink.table.runtime.types.CRow import org.apache.flink.table.typeutils.TypeCheckUtils._ import org.apache.flink.table.util.Logging @@ -48,7 +49,7 @@ abstract class NonWindowJoin( genJoinFuncName: String, genJoinFuncCode: String, queryConfig: StreamQueryConfig) - extends CoProcessFunction[CRow, CRow, CRow] + extends CoProcessFunctionWithCleanupState[CRow, CRow, CRow](queryConfig) with Compiler[FlatJoinFunction[Row, Row, Row]] with Logging { @@ -62,15 +63,6 @@ abstract class NonWindowJoin( protected var rightState: MapState[Row, JTuple2[Long, Long]] = _ protected var cRowWrapper: CRowWrappingMultiOutputCollector = _ - protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime - protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime - protected val stateCleaningEnabled: Boolean = minRetentionTime > 1 - - // state to record last timer of left stream, 0 means no timer - protected var leftTimer: ValueState[Long] = _ - // state to record last timer of right stream, 0 means no timer - protected var rightTimer: ValueState[Long] = _ - // other condition function protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _ @@ -78,7 +70,8 @@ abstract class NonWindowJoin( protected var curProcessTime: Long = _ override def open(parameters: Configuration): Unit = { - LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n Code:\n$genJoinFuncCode") + LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " + + s"Code:\n$genJoinFuncCode") val clazz = compile( getRuntimeContext.getUserCodeClassLoader, genJoinFuncName, @@ -100,10 +93,7 @@ abstract class NonWindowJoin( rightState = getRuntimeContext.getMapState(rightStateDescriptor) // initialize timer state - val valueStateDescriptor1 = new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long]) - leftTimer = getRuntimeContext.getState(valueStateDescriptor1) - val valueStateDescriptor2 = new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long]) - rightTimer = getRuntimeContext.getState(valueStateDescriptor2) + initCleanupTimeState("NonWindowJoinCleanupTime") cRowWrapper = new CRowWrappingMultiOutputCollector() LOG.debug("Instantiating NonWindowJoin.") @@ -122,7 +112,7 @@ abstract class NonWindowJoin( ctx: CoProcessFunction[CRow, CRow, CRow]#Context, out: Collector[CRow]): Unit = { - processElement(valueC, ctx, out, leftTimer, leftState, rightState, isLeft = true) + processElement(valueC, ctx, out, leftState, rightState, isLeft = true) } /** @@ -138,7 +128,7 @@ abstract class NonWindowJoin( ctx: CoProcessFunction[CRow, CRow, CRow]#Context, out: Collector[CRow]): Unit = { - processElement(valueC, ctx, out, rightTimer, rightState, leftState, isLeft = false) + processElement(valueC, ctx, out, rightState, leftState, isLeft = false) } /** @@ -154,28 +144,13 @@ abstract class NonWindowJoin( ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext, out: Collector[CRow]): Unit = { - if (stateCleaningEnabled && leftTimer.value == timestamp) { - expireOutTimeRow( - timestamp, - leftState, - leftTimer, - isLeft = true, - ctx - ) - } - - if (stateCleaningEnabled && rightTimer.value == timestamp) { - expireOutTimeRow( - timestamp, - rightState, - rightTimer, - isLeft = false, - ctx - ) + // expired timer has already been removed, delete state directly. + if (stateCleaningEnabled) { + cleanupState(leftState, rightState) } } - def getNewExpiredTime(curProcessTime: Long, oldExpiredTime: Long): Long = { + protected def getNewExpiredTime(curProcessTime: Long, oldExpiredTime: Long): Long = { if (stateCleaningEnabled && curProcessTime + minRetentionTime > oldExpiredTime) { curProcessTime + maxRetentionTime } else { @@ -183,53 +158,15 @@ abstract class NonWindowJoin( } } - /** - * Removes records which are expired from the state. Register a new timer if the state still - * holds records after the clean-up. - */ - def expireOutTimeRow( - curTime: Long, - rowMapState: MapState[Row, JTuple2[Long, Long]], - timerState: ValueState[Long], - isLeft: Boolean, - ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext): Unit = { - - val rowMapIter = rowMapState.iterator() - var validTimestamp: Boolean = false - - while (rowMapIter.hasNext) { - val mapEntry = rowMapIter.next() - val recordExpiredTime = mapEntry.getValue.f1 - if (recordExpiredTime <= curTime) { - rowMapIter.remove() - } else { - // we found a timestamp that is still valid - validTimestamp = true - } - } - - // If the state has non-expired timestamps, register a new timer. - // Otherwise clean the complete state for this input. - if (validTimestamp) { - val cleanupTime = curTime + maxRetentionTime - ctx.timerService.registerProcessingTimeTimer(cleanupTime) - timerState.update(cleanupTime) - } else { - timerState.clear() - rowMapState.clear() - } - } - /** * Puts or Retract an element from the input stream into state and search the other state to * output records meet the condition. Records will be expired in state if state retention time * has been specified. */ - def processElement( + protected def processElement( value: CRow, ctx: CoProcessFunction[CRow, CRow, CRow]#Context, out: Collector[CRow], - timerState: ValueState[Long], currentSideState: MapState[Row, JTuple2[Long, Long]], otherSideState: MapState[Row, JTuple2[Long, Long]], isLeft: Boolean): Unit @@ -240,14 +177,12 @@ abstract class NonWindowJoin( * * @param value The input CRow * @param ctx The ctx to register timer or get current time - * @param timerState The state to record last timer * @param currentSideState The state to hold current side stream element * @return The row number and expired time for current input row */ - def updateCurrentSide( + protected def updateCurrentSide( value: CRow, ctx: CoProcessFunction[CRow, CRow, CRow]#Context, - timerState: ValueState[Long], currentSideState: MapState[Row, JTuple2[Long, Long]]): JTuple2[Long, Long] = { val inputRow = value.row @@ -261,10 +196,7 @@ abstract class NonWindowJoin( cntAndExpiredTime.f1 = getNewExpiredTime(curProcessTime, cntAndExpiredTime.f1) // update timer if necessary - if (stateCleaningEnabled && timerState.value() == 0) { - timerState.update(cntAndExpiredTime.f1) - ctx.timerService().registerProcessingTimeTimer(cntAndExpiredTime.f1) - } + processCleanupTimer(ctx, curProcessTime) // update current side stream state if (!value.change) { @@ -282,7 +214,7 @@ abstract class NonWindowJoin( cntAndExpiredTime } - def callJoinFunction( + protected def callJoinFunction( inputRow: Row, inputRowFromLeft: Boolean, otherSideRow: Row, @@ -294,8 +226,4 @@ abstract class NonWindowJoin( joinFunction.join(otherSideRow, inputRow, cRowWrapper) } } - - override def close(): Unit = { - FunctionUtils.closeFunction(joinFunction) - } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoin.scala index b4f14e494f7..5995fb81a2c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoin.scala @@ -69,13 +69,12 @@ class NonWindowLeftRightJoin( value: CRow, ctx: CoProcessFunction[CRow, CRow, CRow]#Context, out: Collector[CRow], - timerState: ValueState[Long], currentSideState: MapState[Row, JTuple2[Long, Long]], otherSideState: MapState[Row, JTuple2[Long, Long]], recordFromLeft: Boolean): Unit = { val inputRow = value.row - updateCurrentSide(value, ctx, timerState, currentSideState) + updateCurrentSide(value, ctx, currentSideState) cRowWrapper.reset() cRowWrapper.setCollector(out) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoinWithNonEquiPredicates.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoinWithNonEquiPredicates.scala index 33517cca5f9..a3e25f9489d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoinWithNonEquiPredicates.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftRightJoinWithNonEquiPredicates.scala @@ -71,14 +71,13 @@ class NonWindowLeftRightJoinWithNonEquiPredicates( value: CRow, ctx: CoProcessFunction[CRow, CRow, CRow]#Context, out: Collector[CRow], - timerState: ValueState[Long], currentSideState: MapState[Row, JTuple2[Long, Long]], otherSideState: MapState[Row, JTuple2[Long, Long]], recordFromLeft: Boolean): Unit = { val currentJoinCntState = getJoinCntState(joinCntState, recordFromLeft) val inputRow = value.row - val cntAndExpiredTime = updateCurrentSide(value, ctx, timerState, currentSideState) + val cntAndExpiredTime = updateCurrentSide(value, ctx, currentSideState) if (!value.change && cntAndExpiredTime.f0 <= 0 && recordFromLeft == isLeftJoin) { currentJoinCntState.remove(inputRow) } @@ -101,17 +100,21 @@ class NonWindowLeftRightJoinWithNonEquiPredicates( } /** - * Removes records which are expired from state. Register a new timer if the state still - * holds records after the clean-up. Also, clear joinCnt map state when clear rowMapState. + * Called when a processing timer trigger. + * Expire left/right expired records and expired joinCnt state. */ - override def expireOutTimeRow( - curTime: Long, - rowMapState: MapState[Row, JTuple2[Long, Long]], - timerState: ValueState[Long], - isLeft: Boolean, - ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext): Unit = { + override def onTimer( + timestamp: Long, + ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext, + out: Collector[CRow]): Unit = { - expireOutTimeRow(curTime, rowMapState, timerState, isLeft, joinCntState, ctx) + // expired timer has already been removed, delete state directly. + if (stateCleaningEnabled) { + cleanupState( + leftState, + rightState, + getJoinCntState(joinCntState, isLeftJoin)) + } } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala index 0018a16bc23..9fadbb0eb64 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoin.scala @@ -73,7 +73,7 @@ abstract class NonWindowOuterJoin( * @param otherSideState the other side state * @return the number of matched rows */ - def preservedJoin( + protected def preservedJoin( inputRow: Row, inputRowFromLeft: Boolean, otherSideState: MapState[Row, JTuple2[Long, Long]]): Long = { @@ -106,7 +106,7 @@ abstract class NonWindowOuterJoin( * RowWrapper has been reset before we call retractJoin and we also assume that the current * change of cRowWrapper is equal to value.change. */ - def retractJoin( + protected def retractJoin( value: CRow, inputRowFromLeft: Boolean, currentSideState: MapState[Row, JTuple2[Long, Long]], @@ -152,7 +152,8 @@ abstract class NonWindowOuterJoin( * Return approximate number of records in corresponding state. Only check if record number is * 0, 1 or bigger. */ - def approxiRecordNumInState(currentSideState: MapState[Row, JTuple2[Long, Long]]): Long = { + protected def approxiRecordNumInState( + currentSideState: MapState[Row, JTuple2[Long, Long]]): Long = { var recordNum = 0L val it = currentSideState.iterator() while(it.hasNext && recordNum < 2) { @@ -164,7 +165,7 @@ abstract class NonWindowOuterJoin( /** * Append input row with default null value if there is no match and Collect. */ - def collectAppendNull( + protected def collectAppendNull( inputRow: Row, inputFromLeft: Boolean, out: Collector[Row]): Unit = { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoinWithNonEquiPredicates.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoinWithNonEquiPredicates.scala index 8fe2f4fe410..0eb6a8114fd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoinWithNonEquiPredicates.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowOuterJoinWithNonEquiPredicates.scala @@ -21,7 +21,6 @@ import org.apache.flink.api.common.state._ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.configuration.Configuration -import org.apache.flink.streaming.api.functions.co.CoProcessFunction import org.apache.flink.table.api.{StreamQueryConfig, Types} import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row @@ -83,7 +82,7 @@ import org.apache.flink.types.Row * unmatched or vice versa. The RowWrapper has been reset before we call retractJoin and we * also assume that the current change of cRowWrapper is equal to value.change. */ - def retractJoinWithNonEquiPreds( + protected def retractJoinWithNonEquiPreds( value: CRow, inputRowFromLeft: Boolean, otherSideState: MapState[Row, JTuple2[Long, Long]], @@ -131,48 +130,6 @@ import org.apache.flink.types.Row } } - /** - * Removes records which are expired from state. Registers a new timer if the state still - * holds records after the clean-up. Also, clear joinCnt map state when clear rowMapState. - */ - def expireOutTimeRow( - curTime: Long, - rowMapState: MapState[Row, JTuple2[Long, Long]], - timerState: ValueState[Long], - isLeft: Boolean, - joinCntState: Array[MapState[Row, Long]], - ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext): Unit = { - - val currentJoinCntState = getJoinCntState(joinCntState, isLeft) - val rowMapIter = rowMapState.iterator() - var validTimestamp: Boolean = false - - while (rowMapIter.hasNext) { - val mapEntry = rowMapIter.next() - val recordExpiredTime = mapEntry.getValue.f1 - if (recordExpiredTime <= curTime) { - rowMapIter.remove() - currentJoinCntState.remove(mapEntry.getKey) - } else { - // we found a timestamp that is still valid - validTimestamp = true - } - } - // If the state has non-expired timestamps, register a new timer. - // Otherwise clean the complete state for this input. - if (validTimestamp) { - val cleanupTime = curTime + maxRetentionTime - ctx.timerService.registerProcessingTimeTimer(cleanupTime) - timerState.update(cleanupTime) - } else { - timerState.clear() - rowMapState.clear() - if (isLeft == isLeftJoin) { - currentJoinCntState.clear() - } - } - } - /** * Get left or right join cnt state. * @@ -181,7 +138,7 @@ import org.apache.flink.types.Row * @param isLeftCntState the flag whether get the left join cnt state * @return the corresponding join cnt state */ - def getJoinCntState( + protected def getJoinCntState( joinCntState: Array[MapState[Row, Long]], isLeftCntState: Boolean) : MapState[Row, Long] = { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala index 3c18449e953..6ae5e6340a6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala @@ -71,6 +71,10 @@ class StateCleaningCountTrigger(queryConfig: StreamQueryConfig, maxCount: Long) val cleanupTime = currentTime + maxRetentionTime // register timer and remember clean-up time ctx.registerProcessingTimeTimer(cleanupTime) + // delete expired timer + if (curCleanupTime != null) { + ctx.deleteProcessingTimeTimer(curCleanupTime) + } ctx.getPartitionedState(cleanupStateDesc).update(cleanupTime) } @@ -93,12 +97,9 @@ class StateCleaningCountTrigger(queryConfig: StreamQueryConfig, maxCount: Long) ctx: TriggerContext): TriggerResult = { if (stateCleaningEnabled) { - val cleanupTime = ctx.getPartitionedState(cleanupStateDesc).value() - // check that the triggered timer is the last registered processing time timer. - if (null != cleanupTime && time == cleanupTime) { - clear(window, ctx) - return TriggerResult.FIRE_AND_PURGE - } + // do clear directly, since we have already deleted expired timer + clear(window, ctx) + return TriggerResult.FIRE_AND_PURGE } TriggerResult.CONTINUE } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala index bd19be83757..4619c759c31 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala @@ -21,21 +21,18 @@ import java.lang.{Integer => JInt, Long => JLong} import java.util.concurrent.ConcurrentLinkedQueue import org.apache.flink.api.common.time.Time -import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.java.operators.join.JoinType -import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.runtime.streamrecord.StreamRecord import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness -import org.apache.flink.table.api.{StreamQueryConfig, Types} -import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, RowResultSortComparatorWithWatermarks, TestStreamQueryConfig, TupleRowKeySelector} +import org.apache.flink.table.api.Types +import org.apache.flink.table.runtime.harness.HarnessTestBase.{TestStreamQueryConfig, TupleRowKeySelector} import org.apache.flink.table.runtime.join._ import org.apache.flink.table.runtime.operators.KeyedCoProcessOperatorWithWatermarkDelay -import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} -import org.apache.flink.types.Row -import org.junit.Assert.{assertEquals, assertTrue} +import org.apache.flink.table.runtime.types.CRow +import org.junit.Assert.assertEquals import org.junit.Test /** @@ -830,14 +827,6 @@ class JoinHarnessTest extends HarnessTestBase { @Test def testNonWindowInnerJoin() { - val joinReturnType = CRowTypeInfo(new RowTypeInfo( - Array[TypeInformation[_]]( - INT_TYPE_INFO, - STRING_TYPE_INFO, - INT_TYPE_INFO, - STRING_TYPE_INFO), - Array("a", "b", "c", "d"))) - val joinProcessFunc = new NonWindowInnerJoin( rowType, rowType, @@ -879,35 +868,32 @@ class JoinHarnessTest extends HarnessTestBase { // right stream input and output normally testHarness.processElement2(new StreamRecord( CRow(1: JInt, "Hi1"))) - assertEquals(6, testHarness.numKeyedStateEntries()) - assertEquals(3, testHarness.numProcessingTimeTimers()) + // lkeys(1, 2) rkeys(1) timer_key_time(1:5, 2:6) + assertEquals(5, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) testHarness.setProcessingTime(4) testHarness.processElement2(new StreamRecord( CRow(2: JInt, "Hello1"))) - assertEquals(8, testHarness.numKeyedStateEntries()) - assertEquals(4, testHarness.numProcessingTimeTimers()) + // lkeys(1, 2) rkeys(1, 2) timer_key_time(1:5, 2:6) + assertEquals(6, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) - // expired left stream record with key value of 1 + // expired stream record with key value of 1 testHarness.setProcessingTime(5) testHarness.processElement2(new StreamRecord( CRow(1: JInt, "Hi2"))) - assertEquals(6, testHarness.numKeyedStateEntries()) - assertEquals(3, testHarness.numProcessingTimeTimers()) - - // expired all left stream record - testHarness.setProcessingTime(6) - assertEquals(4, testHarness.numKeyedStateEntries()) + // lkeys(2) rkeys(1, 2) timer_key_time(1:9, 2:6) + assertEquals(5, testHarness.numKeyedStateEntries()) assertEquals(2, testHarness.numProcessingTimeTimers()) - // expired right stream record with key value of 2 - testHarness.setProcessingTime(8) + // expired all left stream records + testHarness.setProcessingTime(6) + // lkeys() rkeys(1) timer_key_time(1:9) assertEquals(2, testHarness.numKeyedStateEntries()) assertEquals(1, testHarness.numProcessingTimeTimers()) - testHarness.setProcessingTime(10) - assertTrue(testHarness.numKeyedStateEntries() > 0) - // expired all right stream record - testHarness.setProcessingTime(11) + // expired all stream records + testHarness.setProcessingTime(9) assertEquals(0, testHarness.numKeyedStateEntries()) assertEquals(0, testHarness.numProcessingTimeTimers()) @@ -975,32 +961,37 @@ class JoinHarnessTest extends HarnessTestBase { CRow(1: JInt, "Hi1"))) testHarness.processElement2(new StreamRecord( CRow(false, 1: JInt, "Hi1"))) - assertEquals(5, testHarness.numKeyedStateEntries()) - assertEquals(3, testHarness.numProcessingTimeTimers()) + // lkeys(1, 2) rkeys() timer_key_time(1:5, 2:6) + assertEquals(4, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) testHarness.setProcessingTime(4) testHarness.processElement2(new StreamRecord( CRow(2: JInt, "Hello1"))) - assertEquals(7, testHarness.numKeyedStateEntries()) - assertEquals(4, testHarness.numProcessingTimeTimers()) + // lkeys(1, 2) rkeys(2) timer_key_time(1:5, 2:6) + assertEquals(5, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) testHarness.processElement1(new StreamRecord( CRow(false, 1: JInt, "aaa"))) - // expired left stream record with key value of 1 + // expired stream records with key value of 1 testHarness.setProcessingTime(5) + // lkeys(2) rkeys(2) timer_key_time(2:6) testHarness.processElement2(new StreamRecord( CRow(1: JInt, "Hi2"))) testHarness.processElement2(new StreamRecord( CRow(false, 1: JInt, "Hi2"))) - assertEquals(5, testHarness.numKeyedStateEntries()) - assertEquals(3, testHarness.numProcessingTimeTimers()) + // lkeys(2) rkeys(2) timer_key_time(1:9, 2:6) + assertEquals(4, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) - // expired all left stream record + // expired all stream records testHarness.setProcessingTime(6) - assertEquals(3, testHarness.numKeyedStateEntries()) - assertEquals(2, testHarness.numProcessingTimeTimers()) + // lkeys() rkeys() timer_key_time(1:9) + assertEquals(1, testHarness.numKeyedStateEntries()) + assertEquals(1, testHarness.numProcessingTimeTimers()) - // expired right stream record with key value of 2 - testHarness.setProcessingTime(8) + // expired all data + testHarness.setProcessingTime(9) assertEquals(0, testHarness.numKeyedStateEntries()) assertEquals(0, testHarness.numProcessingTimeTimers()) @@ -1067,32 +1058,36 @@ class JoinHarnessTest extends HarnessTestBase { CRow(1: JInt, "Hi1"))) testHarness.processElement2(new StreamRecord( CRow(false, 1: JInt, "Hi1"))) - assertEquals(5, testHarness.numKeyedStateEntries()) - assertEquals(3, testHarness.numProcessingTimeTimers()) + // lkeys(1, 2) rkeys() timer_key_time(1:5, 2:6) + assertEquals(4, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) testHarness.setProcessingTime(4) testHarness.processElement2(new StreamRecord( CRow(2: JInt, "Hello1"))) - assertEquals(7, testHarness.numKeyedStateEntries()) - assertEquals(4, testHarness.numProcessingTimeTimers()) + // lkeys(1, 2) rkeys(2) timer_key_time(1:5, 2:6) + assertEquals(5, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) testHarness.processElement1(new StreamRecord( CRow(false, 1: JInt, "aaa"))) - // expired left stream record with key value of 1 + // expired stream records with key value of 1 testHarness.setProcessingTime(5) testHarness.processElement2(new StreamRecord( CRow(1: JInt, "Hi2"))) testHarness.processElement2(new StreamRecord( CRow(false, 1: JInt, "Hi2"))) - assertEquals(5, testHarness.numKeyedStateEntries()) - assertEquals(3, testHarness.numProcessingTimeTimers()) + // lkeys(2) rkeys(2) timer_key_time(1:9, 2:6) + assertEquals(4, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) - // expired all left stream record + // expired stream records with key value of 2 testHarness.setProcessingTime(6) - assertEquals(3, testHarness.numKeyedStateEntries()) - assertEquals(2, testHarness.numProcessingTimeTimers()) + // lkeys() rkeys() timer_key_time(1:9) + assertEquals(1, testHarness.numKeyedStateEntries()) + assertEquals(1, testHarness.numProcessingTimeTimers()) - // expired right stream record with key value of 2 - testHarness.setProcessingTime(8) + // expired all data + testHarness.setProcessingTime(9) assertEquals(0, testHarness.numKeyedStateEntries()) assertEquals(0, testHarness.numProcessingTimeTimers()) @@ -1160,7 +1155,7 @@ class JoinHarnessTest extends HarnessTestBase { testHarness.processElement1(new StreamRecord( CRow(1: JInt, "bbb"))) assertEquals(1, testHarness.numProcessingTimeTimers()) - // 1 left timer(5), 1 left key(1), 1 join cnt + // lkeys(1) rkeys() timer_key_time(1:5) assertEquals(3, testHarness.numKeyedStateEntries()) testHarness.setProcessingTime(2) testHarness.processElement1(new StreamRecord( @@ -1168,7 +1163,8 @@ class JoinHarnessTest extends HarnessTestBase { testHarness.processElement1(new StreamRecord( CRow(2: JInt, "bbb"))) assertEquals(2, testHarness.numProcessingTimeTimers()) - // 2 left timer(5,6), 2 left key(1,2), 2 join cnt + // lkeys(1, 2) rkeys() timer_key_time(1:5, 2:6) + // l_join_cnt_keys(1, 2) assertEquals(6, testHarness.numKeyedStateEntries()) testHarness.setProcessingTime(3) @@ -1177,17 +1173,19 @@ class JoinHarnessTest extends HarnessTestBase { CRow(1: JInt, "Hi1"))) testHarness.processElement2(new StreamRecord( CRow(false, 1: JInt, "bbb"))) - // 2 left timer(5,6), 2 left keys(1,2), 2 join cnt, 1 right timer(7), 1 right key(1) - assertEquals(8, testHarness.numKeyedStateEntries()) - assertEquals(3, testHarness.numProcessingTimeTimers()) + // lkeys(1, 2) rkeys(1) timer_key_time(1:5, 2:6) + // l_join_cnt_keys(1, 2) + assertEquals(7, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) testHarness.setProcessingTime(4) testHarness.processElement2(new StreamRecord( CRow(2: JInt, "ccc"))) testHarness.processElement2(new StreamRecord( CRow(2: JInt, "Hello"))) - // 2 left timer(5,6), 2 left keys(1,2), 2 join cnt, 2 right timer(7,8), 2 right key(1,2) - assertEquals(10, testHarness.numKeyedStateEntries()) - assertEquals(4, testHarness.numProcessingTimeTimers()) + // lkeys(1, 2) rkeys(1, 2) timer_key_time(1:5, 2:6) + // l_join_cnt_keys(1, 2) + assertEquals(8, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) testHarness.processElement1(new StreamRecord( CRow(false, 1: JInt, "aaa"))) @@ -1197,22 +1195,29 @@ class JoinHarnessTest extends HarnessTestBase { CRow(false, 1: JInt, "Hi2"))) testHarness.processElement2(new StreamRecord( CRow(false, 1: JInt, "Hi1"))) - // expired left stream record with key value of 1 + // lkeys(1, 2) rkeys(2) timer_key_time(1:8, 2:6) + // l_join_cnt_keys(1, 2) + assertEquals(7, testHarness.numKeyedStateEntries()) testHarness.setProcessingTime(5) + // [1]. this will clean up left stream records with expired time of 5 testHarness.processElement2(new StreamRecord( CRow(1: JInt, "Hi3"))) + // [2]. there are no elements can be connected, since be cleaned by [1] testHarness.processElement2(new StreamRecord( CRow(false, 1: JInt, "Hi3"))) - // 1 left timer(6), 1 left keys(2), 1 join cnt, 2 right timer(7,8), 1 right key(2) - assertEquals(6, testHarness.numKeyedStateEntries()) - assertEquals(3, testHarness.numProcessingTimeTimers()) + // lkeys(1, 2) rkeys(2) timer_key_time(1:8, 2:6) + // l_join_cnt_keys(1, 2) + assertEquals(7, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) - // expired all left stream record + // expired all records with key value of 2 testHarness.setProcessingTime(6) + // lkeys(1) rkeys() timer_key_time(1:8) + // l_join_cnt_keys(1) assertEquals(3, testHarness.numKeyedStateEntries()) - assertEquals(2, testHarness.numProcessingTimeTimers()) + assertEquals(1, testHarness.numProcessingTimeTimers()) - // expired right stream record with key value of 2 + // expired all data testHarness.setProcessingTime(8) assertEquals(0, testHarness.numKeyedStateEntries()) assertEquals(0, testHarness.numProcessingTimeTimers()) @@ -1253,6 +1258,12 @@ class JoinHarnessTest extends HarnessTestBase { CRow(false, 1: JInt, "bbb", 1: JInt, "Hi1"))) expectedOutput.add(new StreamRecord( CRow(1: JInt, "bbb", null: JInt, null))) + // processing time of 5 + // timer of 8, we use only one timer state now + expectedOutput.add(new StreamRecord( + CRow(false, 1: JInt, "bbb", null: JInt, null))) + expectedOutput.add(new StreamRecord( + CRow(1: JInt, "bbb", 1: JInt, "Hi3"))) verify(expectedOutput, result) testHarness.close() @@ -1305,32 +1316,36 @@ class JoinHarnessTest extends HarnessTestBase { CRow(1: JInt, "Hi1"))) testHarness.processElement1(new StreamRecord( CRow(false, 1: JInt, "Hi1"))) - assertEquals(5, testHarness.numKeyedStateEntries()) - assertEquals(3, testHarness.numProcessingTimeTimers()) + // lkeys() rkeys(1, 2) timer_key_time(1:5, 2:6) + assertEquals(4, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) testHarness.setProcessingTime(4) testHarness.processElement1(new StreamRecord( CRow(2: JInt, "Hello1"))) - assertEquals(7, testHarness.numKeyedStateEntries()) - assertEquals(4, testHarness.numProcessingTimeTimers()) + // lkeys(2) rkeys(1, 2) timer_key_time(1:5, 2:6) + assertEquals(5, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) testHarness.processElement2(new StreamRecord( CRow(false, 1: JInt, "aaa"))) - // expired right stream record with key value of 1 + // expired stream records with key value of 1 testHarness.setProcessingTime(5) testHarness.processElement1(new StreamRecord( CRow(1: JInt, "Hi2"))) testHarness.processElement1(new StreamRecord( CRow(false, 1: JInt, "Hi2"))) - assertEquals(5, testHarness.numKeyedStateEntries()) - assertEquals(3, testHarness.numProcessingTimeTimers()) + // lkeys(2) rkeys(2) timer_key_time(1:9, 2:6) + assertEquals(4, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) - // expired all right stream record + // expired stream records with key value of 2 testHarness.setProcessingTime(6) - assertEquals(3, testHarness.numKeyedStateEntries()) - assertEquals(2, testHarness.numProcessingTimeTimers()) + // lkeys() rkeys() timer_key_time(1:9) + assertEquals(1, testHarness.numKeyedStateEntries()) + assertEquals(1, testHarness.numProcessingTimeTimers()) - // expired left stream record with key value of 2 - testHarness.setProcessingTime(8) + // expired all data + testHarness.setProcessingTime(9) assertEquals(0, testHarness.numKeyedStateEntries()) assertEquals(0, testHarness.numProcessingTimeTimers()) @@ -1398,15 +1413,17 @@ class JoinHarnessTest extends HarnessTestBase { testHarness.processElement2(new StreamRecord( CRow(1: JInt, "bbb"))) assertEquals(1, testHarness.numProcessingTimeTimers()) - // 1 right timer(5), 1 right key(1), 1 join cnt + // lkeys() rkeys(1) timer_key_time(1:5) + // r_join_cnt_keys(1) assertEquals(3, testHarness.numKeyedStateEntries()) testHarness.setProcessingTime(2) testHarness.processElement2(new StreamRecord( CRow(1: JInt, "aaa"))) testHarness.processElement2(new StreamRecord( CRow(2: JInt, "bbb"))) + // lkeys() rkeys(1, 2) timer_key_time(1:5, 2:6) + // r_join_cnt_keys(1, 2) assertEquals(2, testHarness.numProcessingTimeTimers()) - // 2 right timer(5,6), 2 right key(1,2), 2 join cnt assertEquals(6, testHarness.numKeyedStateEntries()) testHarness.setProcessingTime(3) @@ -1415,17 +1432,19 @@ class JoinHarnessTest extends HarnessTestBase { CRow(1: JInt, "Hi1"))) testHarness.processElement1(new StreamRecord( CRow(false, 1: JInt, "bbb"))) - // 2 right timer(5,6), 2 right keys(1,2), 2 join cnt, 1 left timer(7), 1 left key(1) - assertEquals(8, testHarness.numKeyedStateEntries()) - assertEquals(3, testHarness.numProcessingTimeTimers()) + // lkeys(1) rkeys(1, 2) timer_key_time(1:5, 2:6) + // r_join_cnt_keys(1, 2) + assertEquals(7, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) testHarness.setProcessingTime(4) testHarness.processElement1(new StreamRecord( CRow(2: JInt, "ccc"))) testHarness.processElement1(new StreamRecord( CRow(2: JInt, "Hello"))) - // 2 right timer(5,6), 2 right keys(1,2), 2 join cnt, 2 left timer(7,8), 2 left key(1,2) - assertEquals(10, testHarness.numKeyedStateEntries()) - assertEquals(4, testHarness.numProcessingTimeTimers()) + // lkeys(1, 2) rkeys(1, 2) timer_key_time(1:5, 2:6) + // r_join_cnt_keys(1, 2) + assertEquals(8, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) testHarness.processElement2(new StreamRecord( CRow(false, 1: JInt, "aaa"))) @@ -1435,22 +1454,27 @@ class JoinHarnessTest extends HarnessTestBase { CRow(false, 1: JInt, "Hi2"))) testHarness.processElement1(new StreamRecord( CRow(false, 1: JInt, "Hi1"))) - // expired right stream record with key value of 1 + // lkeys(2) rkeys(1, 2) timer_key_time(1:8, 2:6) + // r_join_cnt_keys(1, 2) + assertEquals(7, testHarness.numKeyedStateEntries()) testHarness.setProcessingTime(5) testHarness.processElement1(new StreamRecord( CRow(1: JInt, "Hi3"))) testHarness.processElement1(new StreamRecord( CRow(false, 1: JInt, "Hi3"))) - // 1 right timer(6), 1 right keys(2), 1 join cnt, 2 left timer(7,8), 1 left key(2) - assertEquals(6, testHarness.numKeyedStateEntries()) - assertEquals(3, testHarness.numProcessingTimeTimers()) + // lkeys(2) rkeys(1, 2) timer_key_time(1:8, 2:6) + // r_join_cnt_keys(1, 2) + assertEquals(7, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) - // expired all right stream record + // expired all stream records with key value of 2 + // lkeys() rkeys(1) timer_key_time(1:8) + // r_join_cnt_keys(1) testHarness.setProcessingTime(6) assertEquals(3, testHarness.numKeyedStateEntries()) - assertEquals(2, testHarness.numProcessingTimeTimers()) + assertEquals(1, testHarness.numProcessingTimeTimers()) - // expired left stream record with key value of 2 + // expired all data testHarness.setProcessingTime(8) assertEquals(0, testHarness.numKeyedStateEntries()) assertEquals(0, testHarness.numProcessingTimeTimers()) @@ -1491,6 +1515,12 @@ class JoinHarnessTest extends HarnessTestBase { CRow(false, 1: JInt, "Hi1", 1: JInt, "bbb"))) expectedOutput.add(new StreamRecord( CRow(null: JInt, null, 1: JInt, "bbb"))) + // processing time of 5 + // timer of 8, we use only one timer state now + expectedOutput.add(new StreamRecord( + CRow(false, null: JInt, null, 1: JInt, "bbb"))) + expectedOutput.add(new StreamRecord( + CRow(1: JInt, "Hi3", 1: JInt, "bbb"))) verify(expectedOutput, result) testHarness.close() @@ -1524,8 +1554,8 @@ class JoinHarnessTest extends HarnessTestBase { CRow(1: JInt, "bbb"))) testHarness.processElement1(new StreamRecord( CRow(1: JInt, "ccc"))) + // lkeys(1) rkeys() timer_key_time(1:5) assertEquals(1, testHarness.numProcessingTimeTimers()) - // 1 left timer(5), 1 left key(1) assertEquals(2, testHarness.numKeyedStateEntries()) testHarness.setProcessingTime(2) @@ -1534,8 +1564,7 @@ class JoinHarnessTest extends HarnessTestBase { testHarness.processElement2(new StreamRecord( CRow(2: JInt, "ccc"))) assertEquals(2, testHarness.numProcessingTimeTimers()) - // 1 left timer(5), 1 left key(1) - // 1 right timer(6), 1 right key(1) + // lkeys(1) rkeys(2) timer_key_time(1:5, 2:6) assertEquals(4, testHarness.numKeyedStateEntries()) testHarness.setProcessingTime(3) @@ -1543,18 +1572,16 @@ class JoinHarnessTest extends HarnessTestBase { CRow(2: JInt, "aaa"))) testHarness.processElement1(new StreamRecord( CRow(2: JInt, "ddd"))) - assertEquals(3, testHarness.numProcessingTimeTimers()) - // 2 left timer(5,7), 2 left key(1,2) - // 1 right timer(6), 1 right key(1) - assertEquals(6, testHarness.numKeyedStateEntries()) + // lkeys(1, 2) rkeys(2) timer_key_time(1:5, 2:6) + assertEquals(2, testHarness.numProcessingTimeTimers()) + assertEquals(5, testHarness.numKeyedStateEntries()) testHarness.processElement2(new StreamRecord( CRow(1: JInt, "aaa"))) testHarness.processElement2(new StreamRecord( CRow(1: JInt, "ddd"))) - assertEquals(4, testHarness.numProcessingTimeTimers()) - // 2 left timer(5,7), 2 left key(1,2) - // 2 right timer(6,7), 2 right key(1,2) - assertEquals(8, testHarness.numKeyedStateEntries()) + // lkeys(1, 2) rkeys(1, 2) timer_key_time(1:5, 2:6) + assertEquals(2, testHarness.numProcessingTimeTimers()) + assertEquals(6, testHarness.numKeyedStateEntries()) testHarness.setProcessingTime(4) testHarness.processElement1(new StreamRecord( @@ -1565,28 +1592,26 @@ class JoinHarnessTest extends HarnessTestBase { CRow(false, 1: JInt, "aaa"))) testHarness.processElement2(new StreamRecord( CRow(false, 1: JInt, "ddd"))) - assertEquals(4, testHarness.numProcessingTimeTimers()) - // 2 left timer(5,7), 1 left key(1) - // 2 right timer(6,7), 1 right key(2) - assertEquals(6, testHarness.numKeyedStateEntries()) + // lkeys(1) rkeys(2) timer_key_time(1:8, 2:6) + assertEquals(2, testHarness.numProcessingTimeTimers()) + assertEquals(4, testHarness.numKeyedStateEntries()) testHarness.setProcessingTime(5) - assertEquals(3, testHarness.numProcessingTimeTimers()) - // 1 left timer(7) - // 2 right timer(6,7), 1 right key(2) + assertEquals(2, testHarness.numProcessingTimeTimers()) assertEquals(4, testHarness.numKeyedStateEntries()) testHarness.setProcessingTime(6) - assertEquals(2, testHarness.numProcessingTimeTimers()) - // 1 left timer(7) - // 2 right timer(7) + // lkeys(1) rkeys() timer_key_time(1:8) + assertEquals(1, testHarness.numProcessingTimeTimers()) assertEquals(2, testHarness.numKeyedStateEntries()) testHarness.setProcessingTime(7) - assertEquals(0, testHarness.numProcessingTimeTimers()) - assertEquals(0, testHarness.numKeyedStateEntries()) + assertEquals(1, testHarness.numProcessingTimeTimers()) + assertEquals(2, testHarness.numKeyedStateEntries()) testHarness.setProcessingTime(8) + assertEquals(0, testHarness.numProcessingTimeTimers()) + assertEquals(0, testHarness.numKeyedStateEntries()) testHarness.processElement1(new StreamRecord( CRow(1: JInt, "bbb"))) testHarness.processElement2(new StreamRecord( @@ -1693,8 +1718,9 @@ class JoinHarnessTest extends HarnessTestBase { CRow(1: JInt, "bbb"))) testHarness.processElement1(new StreamRecord( CRow(1: JInt, "ccc"))) + // lkeys(1) rkeys() timer_key_time(1:5) + // l_join_cnt_keys(1) r_join_cnt_keys() assertEquals(1, testHarness.numProcessingTimeTimers()) - // 1 left timer(5), 1 left key(1), 1 left joincnt key(1) assertEquals(3, testHarness.numKeyedStateEntries()) testHarness.setProcessingTime(2) @@ -1702,9 +1728,9 @@ class JoinHarnessTest extends HarnessTestBase { CRow(2: JInt, "bbb"))) testHarness.processElement2(new StreamRecord( CRow(2: JInt, "ccc"))) + // lkeys(1) rkeys(2) timer_key_time(1:5, 2:6) + // l_join_cnt_keys(1) r_join_cnt_keys(2) assertEquals(2, testHarness.numProcessingTimeTimers()) - // 1 left timer(5), 1 left key(1), 1 left joincnt key(1) - // 1 right timer(6), 1 right key(1), 1 right joincnt key(1) assertEquals(6, testHarness.numKeyedStateEntries()) testHarness.setProcessingTime(3) @@ -1712,46 +1738,46 @@ class JoinHarnessTest extends HarnessTestBase { CRow(2: JInt, "aaa"))) testHarness.processElement1(new StreamRecord( CRow(2: JInt, "ddd"))) - assertEquals(3, testHarness.numProcessingTimeTimers()) - // 2 left timer(5,7), 2 left key(1,2), 2 left joincnt key(1,2) - // 1 right timer(6), 1 right key(1), 1 right joincnt key(1) - assertEquals(9, testHarness.numKeyedStateEntries()) + // lkeys(1, 2) rkeys(2) timer_key_time(1:5, 2:6) + // l_join_cnt_keys(1, 2) r_join_cnt_keys(2) + assertEquals(2, testHarness.numProcessingTimeTimers()) + assertEquals(8, testHarness.numKeyedStateEntries()) testHarness.processElement2(new StreamRecord( CRow(1: JInt, "aaa"))) testHarness.processElement2(new StreamRecord( CRow(1: JInt, "ddd"))) - assertEquals(4, testHarness.numProcessingTimeTimers()) - // 2 left timer(5,7), 2 left key(1,2), 2 left joincnt key(1,2) - // 2 right timer(6,7), 2 right key(1,2), 2 right joincnt key(1,2) - assertEquals(12, testHarness.numKeyedStateEntries()) + // lkeys(1, 2) rkeys(1, 2) timer_key_time(1:5, 2:6) + // l_join_cnt_keys(1, 2) r_join_cnt_keys(1, 2) + assertEquals(2, testHarness.numProcessingTimeTimers()) + assertEquals(10, testHarness.numKeyedStateEntries()) testHarness.setProcessingTime(4) testHarness.processElement1(new StreamRecord( CRow(false, 2: JInt, "aaa"))) testHarness.processElement2(new StreamRecord( CRow(false, 1: JInt, "ddd"))) - assertEquals(4, testHarness.numProcessingTimeTimers()) - // 2 left timer(5,7), 2 left key(1,2), 2 left joincnt key(1,2) - // 2 right timer(6,7), 2 right key(1,2), 2 right joincnt key(1,2) - assertEquals(12, testHarness.numKeyedStateEntries()) + // lkeys(1, 2) rkeys(1, 2) timer_key_time(1:8, 2:6) + // l_join_cnt_keys(1, 2) r_join_cnt_keys(1, 2) + assertEquals(2, testHarness.numProcessingTimeTimers()) + assertEquals(10, testHarness.numKeyedStateEntries()) testHarness.setProcessingTime(5) - assertEquals(3, testHarness.numProcessingTimeTimers()) - // 1 left timer(7), 1 left key(2), 1 left joincnt key(2) - // 2 right timer(6,7), 2 right key(1,2), 2 right joincnt key(1,2) - assertEquals(9, testHarness.numKeyedStateEntries()) + assertEquals(2, testHarness.numProcessingTimeTimers()) + assertEquals(10, testHarness.numKeyedStateEntries()) testHarness.setProcessingTime(6) - assertEquals(2, testHarness.numProcessingTimeTimers()) - // 1 left timer(7), 1 left key(2), 1 left joincnt key(2) - // 1 right timer(7), 1 right key(2), 1 right joincnt key(2) - assertEquals(6, testHarness.numKeyedStateEntries()) + // lkeys(1) rkeys(1) timer_key_time(1:8) + // l_join_cnt_keys(1) r_join_cnt_keys(1) + assertEquals(1, testHarness.numProcessingTimeTimers()) + assertEquals(5, testHarness.numKeyedStateEntries()) testHarness.setProcessingTime(7) - assertEquals(0, testHarness.numProcessingTimeTimers()) - assertEquals(0, testHarness.numKeyedStateEntries()) + assertEquals(1, testHarness.numProcessingTimeTimers()) + assertEquals(5, testHarness.numKeyedStateEntries()) testHarness.setProcessingTime(8) + assertEquals(0, testHarness.numProcessingTimeTimers()) + assertEquals(0, testHarness.numKeyedStateEntries()) testHarness.processElement1(new StreamRecord( CRow(1: JInt, "bbb"))) testHarness.processElement2(new StreamRecord( diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala index 7f9c0ef2553..25395be7528 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala @@ -80,8 +80,8 @@ class StateCleaningCountTriggerHarnessTest { TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) - // have two timers 6001 and 7002 - assertEquals(2, testHarness.numProcessingTimeTimers) + // have one timer 7002 + assertEquals(1, testHarness.numProcessingTimeTimers) assertEquals(0, testHarness.numEventTimeTimers) assertEquals(2, testHarness.numStateEntries) assertEquals(2, testHarness.numStateEntries(GlobalWindow.get)) @@ -116,9 +116,6 @@ class StateCleaningCountTriggerHarnessTest { // try to trigger onProcessingTime method via 7002, and all states are cleared val timesIt = testHarness.advanceProcessingTime(7002).iterator() - assertEquals( - TriggerResult.CONTINUE, - timesIt.next().f1) assertEquals( TriggerResult.FIRE_AND_PURGE, diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala index fe90a5f3300..1c02889513d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala @@ -110,7 +110,7 @@ private class MockedKeyedProcessFunction(queryConfig: StreamQueryConfig) out: Collector[String]): Unit = { val curTime = ctx.timerService().currentProcessingTime() - registerProcessingCleanupTimer(ctx, curTime) + processCleanupTimer(ctx, curTime) state.update(value._2) } @@ -119,8 +119,12 @@ private class MockedKeyedProcessFunction(queryConfig: StreamQueryConfig) ctx: KeyedProcessFunction[String, (String, String), String]#OnTimerContext, out: Collector[String]): Unit = { - if (needToCleanupState(timestamp)) { - cleanupState(state) + if (stateCleaningEnabled) { + val cleanupTime = cleanupTimeState.value() + if (null != cleanupTime && timestamp == cleanupTime) { + // clean up + cleanupState(state) + } } } } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala index 519b03f59b7..6c0ca1a5012 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala @@ -110,7 +110,7 @@ private class MockedProcessFunction(queryConfig: StreamQueryConfig) out: Collector[String]): Unit = { val curTime = ctx.timerService().currentProcessingTime() - registerProcessingCleanupTimer(ctx, curTime) + processCleanupTimer(ctx, curTime) state.update(value._2) } @@ -119,7 +119,7 @@ private class MockedProcessFunction(queryConfig: StreamQueryConfig) ctx: ProcessFunction[(String, String), String]#OnTimerContext, out: Collector[String]): Unit = { - if (needToCleanupState(timestamp)) { + if (stateCleaningEnabled) { cleanupState(state) } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services