Hi,
I want to get the last value stored in ValueState when processing element in
Trigger.
But as the log shows that sometimes I can get the value, sometimes not.
Only one key in my data(SensorReading).
ValueState:
class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {
private lazy val descriptor = new ValueStateDescriptor[Long]("desc",
classOf[Long])
var value = 1
override def onElement( r: SensorReading, timestamp: Long, window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
println("before update value : " +
ctx.getPartitionedState(descriptor).value())
ctx.getPartitionedState(descriptor).update(value)
value += 1
println("after update value: " + ctx.getPartitionedState(descriptor).value())
ctx.registerProcessingTimeTimer(window.maxTimestamp)
TriggerResult.CONTINUE
}
override def onEventTime(time: Long, window: TimeWindow, ctx:
Trigger.TriggerContext) = TriggerResult.CONTINUE
override def onProcessingTime(time: Long, window: TimeWindow, ctx:
Trigger.TriggerContext) = TriggerResult.FIRE
override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
ctx.deleteProcessingTimeTimer(window.maxTimestamp)
}
override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): Unit = {
val windowMaxTimestamp = window.maxTimestamp
if (windowMaxTimestamp > ctx.getCurrentProcessingTime)
ctx.registerProcessingTimeTimer(windowMaxTimestamp)
}
override def canMerge: Boolean = true
}
Main process:
object MyCustomWindows {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(1000L)
val sensorData: DataStream[SensorReading] = env
.addSource(new SensorSource)
.assignTimestampsAndWatermarks(new SensorTimeAssigner)
val countsPerThirtySecs = sensorData
.keyBy(_.id)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
.trigger(new ProcessingTimeTrigger)
.process(new CountFunction)
env.execute()
}
}
Log results:
before update value : null
after update value: 1
before update value : null
after update value: 2
before update value : null
after update value: 3
before update value : 3
after update value: 4
before update value : null
after update value: 5
before update value : null
after update value: 6
before update value : null
after update value: 7
before update value : null
after update value: 8
before update value : null
after update value: 9
before update value : 9
after update value: 10
Best regards
Utopia