Hi Utopia, The behavior may be correct.
First, the default value is null. It's the correct value. `ValueStateDescriptor` has multiple constructors, some of them can let you specify a default value. However, these constructors are deprecated. And the doc does not recommend them.[1] For the other constructors which can not specify default values, it would be null. Second, before the window, there is a `keyBy` operation. it will partition your data. For each partition, the default value state is null. Best, Vino [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/api/common/state/ValueStateDescriptor.html Utopia <gejunwei...@gmail.com> 于2019年12月18日周三 下午7:20写道: > Hi, > > I want to get the last value stored in ValueState when processing element > in Trigger. > > But as the log shows that sometimes I can get the value, sometimes not. > > Only one key in my data(SensorReading). > > ValueState: > > class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] { > > private lazy val descriptor = new ValueStateDescriptor[Long]("desc", > classOf[Long]) > > var value = 1 > > override def onElement( r: SensorReading, timestamp: Long, window: > TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { > > println("before update value : " + > ctx.getPartitionedState(descriptor).value()) > > ctx.getPartitionedState(descriptor).update(value) > > value += 1 > > println("after update value: " + > ctx.getPartitionedState(descriptor).value()) > > ctx.registerProcessingTimeTimer(window.maxTimestamp) > TriggerResult.CONTINUE > } > > override def onEventTime(time: Long, window: TimeWindow, ctx: > Trigger.TriggerContext) = TriggerResult.CONTINUE > > override def onProcessingTime(time: Long, window: TimeWindow, ctx: > Trigger.TriggerContext) = TriggerResult.FIRE > > override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = > { > ctx.deleteProcessingTimeTimer(window.maxTimestamp) > } > > override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): Unit > = { > val windowMaxTimestamp = window.maxTimestamp > if (windowMaxTimestamp > ctx.getCurrentProcessingTime) > ctx.registerProcessingTimeTimer(windowMaxTimestamp) > } > > override def canMerge: Boolean = true > > } > > > Main process: > > object MyCustomWindows { > > def main(args: Array[String]): Unit = { > > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.getCheckpointConfig.setCheckpointInterval(10 * 1000) > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > env.getConfig.setAutoWatermarkInterval(1000L) > > val sensorData: DataStream[SensorReading] = env > .addSource(new SensorSource) > .assignTimestampsAndWatermarks(new SensorTimeAssigner) > > val countsPerThirtySecs = sensorData > .keyBy(_.id) > .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000))) > .trigger(new ProcessingTimeTrigger) > .process(new CountFunction) > > env.execute() > } > } > > > Log results: > > before update value : null > after update value: 1 > before update value : null > after update value: 2 > before update value : null > after update value: 3 > before update value : 3 > after update value: 4 > before update value : null > after update value: 5 > before update value : null > after update value: 6 > before update value : null > after update value: 7 > before update value : null > after update value: 8 > before update value : null > after update value: 9 > before update value : 9 > after update value: 10 > > > > Best regards > Utopia >