Hi Utopia, IMO, your analysis is correct.
Best, Vino Utopia <gejunwei...@gmail.com> 于2019年12月19日周四 上午12:44写道: > Hi Vino, > > Maybe it is due to the type of window. What I used is > ProcessingTimeSessionWindows, while keyedState is scoped to *window and > key*. Window changes so that the ValueState is different. > > Best regards > Utopia > 在 2019年12月18日 +0800 22:30,Utopia <gejunwei...@gmail.com>,写道: > > Hi Vino, > > Thanks for your reply ! > > The key of my input data is same value. So I think there is only one > partition. > > And Why sometimes I can get the value stored in the ValueState before > update? > > before update value : 3 >> >> after update value: 4 >> >> > What’s more, How can I stored the previous value so that I can get the > value when next element come in and invoke the onElement method? > > > > Best regards > Utopia > 在 2019年12月18日 +0800 21:57,vino yang <yanghua1...@gmail.com>,写道: > > Hi Utopia, > > The behavior may be correct. > > First, the default value is null. It's the correct value. > `ValueStateDescriptor` has multiple constructors, some of them can let you > specify a default value. However, these constructors are deprecated. And > the doc does not recommend them.[1] For the other constructors which can > not specify default values, it would be null. > > Second, before the window, there is a `keyBy` operation. it will partition > your data. For each partition, the default value state is null. > > Best, > Vino > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/api/common/state/ValueStateDescriptor.html > > Utopia <gejunwei...@gmail.com> 于2019年12月18日周三 下午7:20写道: > >> Hi, >> >> I want to get the last value stored in ValueState when processing element >> in Trigger. >> >> But as the log shows that sometimes I can get the value, sometimes not. >> >> Only one key in my data(SensorReading). >> >> ValueState: >> >> class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] { >> >> private lazy val descriptor = new ValueStateDescriptor[Long]("desc", >> classOf[Long]) >> >> var value = 1 >> >> override def onElement( r: SensorReading, timestamp: Long, window: >> TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { >> >> println("before update value : " + >> ctx.getPartitionedState(descriptor).value()) >> >> ctx.getPartitionedState(descriptor).update(value) >> >> value += 1 >> >> println("after update value: " + >> ctx.getPartitionedState(descriptor).value()) >> >> ctx.registerProcessingTimeTimer(window.maxTimestamp) >> TriggerResult.CONTINUE >> } >> >> override def onEventTime(time: Long, window: TimeWindow, ctx: >> Trigger.TriggerContext) = TriggerResult.CONTINUE >> >> override def onProcessingTime(time: Long, window: TimeWindow, ctx: >> Trigger.TriggerContext) = TriggerResult.FIRE >> >> override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit >> = { >> ctx.deleteProcessingTimeTimer(window.maxTimestamp) >> } >> >> override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): >> Unit = { >> val windowMaxTimestamp = window.maxTimestamp >> if (windowMaxTimestamp > ctx.getCurrentProcessingTime) >> ctx.registerProcessingTimeTimer(windowMaxTimestamp) >> } >> >> override def canMerge: Boolean = true >> >> } >> >> >> Main process: >> >> object MyCustomWindows { >> >> def main(args: Array[String]): Unit = { >> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> env.getCheckpointConfig.setCheckpointInterval(10 * 1000) >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> env.getConfig.setAutoWatermarkInterval(1000L) >> >> val sensorData: DataStream[SensorReading] = env >> .addSource(new SensorSource) >> .assignTimestampsAndWatermarks(new SensorTimeAssigner) >> >> val countsPerThirtySecs = sensorData >> .keyBy(_.id) >> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000))) >> .trigger(new ProcessingTimeTrigger) >> .process(new CountFunction) >> >> env.execute() >> } >> } >> >> >> Log results: >> >> before update value : null >> after update value: 1 >> before update value : null >> after update value: 2 >> before update value : null >> after update value: 3 >> before update value : 3 >> after update value: 4 >> before update value : null >> after update value: 5 >> before update value : null >> after update value: 6 >> before update value : null >> after update value: 7 >> before update value : null >> after update value: 8 >> before update value : null >> after update value: 9 >> before update value : 9 >> after update value: 10 >> >> >> >> Best regards >> Utopia >> >