Hi Utopia,

The behavior may be correct.

First, the default value is null. It's the correct value.
`ValueStateDescriptor` has multiple constructors, some of them can let you
specify a default value. However, these constructors are deprecated. And
the doc does not recommend them.[1] For the other constructors which can
not specify default values, it would be null.

Second, before the window, there is a `keyBy` operation. it will partition
your data. For each partition, the default value state is null.

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/api/common/state/ValueStateDescriptor.html

Utopia <gejunwei...@gmail.com> 于2019年12月18日周三 下午7:20写道:

> Hi,
>
> I want to get the last value stored in ValueState when processing element
> in Trigger.
>
> But as the log shows that sometimes I can get the value, sometimes not.
>
> Only one key in my data(SensorReading).
>
> ValueState:
>
> class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {
>
>   private lazy val descriptor = new ValueStateDescriptor[Long]("desc", 
> classOf[Long])
>
>   var value = 1
>
>   override def onElement( r: SensorReading, timestamp: Long, window: 
> TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
>
>     println("before update value : " + 
> ctx.getPartitionedState(descriptor).value())
>
>     ctx.getPartitionedState(descriptor).update(value)
>
>     value += 1
>
>     println("after update value: " + 
> ctx.getPartitionedState(descriptor).value())
>
>     ctx.registerProcessingTimeTimer(window.maxTimestamp)
>     TriggerResult.CONTINUE
>   }
>
>   override def onEventTime(time: Long, window: TimeWindow, ctx: 
> Trigger.TriggerContext) = TriggerResult.CONTINUE
>
>   override def onProcessingTime(time: Long, window: TimeWindow, ctx: 
> Trigger.TriggerContext) = TriggerResult.FIRE
>
>   override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = 
> {
>     ctx.deleteProcessingTimeTimer(window.maxTimestamp)
>   }
>
>   override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): Unit 
> = {
>     val windowMaxTimestamp = window.maxTimestamp
>     if (windowMaxTimestamp > ctx.getCurrentProcessingTime) 
> ctx.registerProcessingTimeTimer(windowMaxTimestamp)
>   }
>
>   override def canMerge: Boolean = true
>
> }
>
>
> Main process:
>
> object MyCustomWindows {
>
>   def main(args: Array[String]): Unit = {
>
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     env.getConfig.setAutoWatermarkInterval(1000L)
>
>     val sensorData: DataStream[SensorReading] = env
>       .addSource(new SensorSource)
>       .assignTimestampsAndWatermarks(new SensorTimeAssigner)
>
>     val countsPerThirtySecs = sensorData
>       .keyBy(_.id)
>       .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
>       .trigger(new ProcessingTimeTrigger)
>       .process(new CountFunction)
>
>     env.execute()
>   }
> }
>
>
> Log results:
>
> before update value : null
> after update value: 1
> before update value : null
> after update value: 2
> before update value : null
> after update value: 3
> before update value : 3
> after update value: 4
> before update value : null
> after update value: 5
> before update value : null
> after update value: 6
> before update value : null
> after update value: 7
> before update value : null
> after update value: 8
> before update value : null
> after update value: 9
> before update value : 9
> after update value: 10
>
>
>
> Best  regards
> Utopia
>

Reply via email to