Hi Vino, Maybe it is due to the type of window. What I used is ProcessingTimeSessionWindows, while keyedState is scoped to window and key. Window changes so that the ValueState is different.
Best regards Utopia 在 2019年12月18日 +0800 22:30,Utopia <gejunwei...@gmail.com>,写道: > Hi Vino, > > Thanks for your reply ! > > The key of my input data is same value. So I think there is only one > partition. > > And Why sometimes I can get the value stored in the ValueState before update? > > > > > before update value : 3 > > > > > after update value: 4 > > What’s more, How can I stored the previous value so that I can get the value > when next element come in and invoke the onElement method? > > > > Best regards > Utopia > 在 2019年12月18日 +0800 21:57,vino yang <yanghua1...@gmail.com>,写道: > > Hi Utopia, > > > > The behavior may be correct. > > > > First, the default value is null. It's the correct value. > > `ValueStateDescriptor` has multiple constructors, some of them can let you > > specify a default value. However, these constructors are deprecated. And > > the doc does not recommend them.[1] For the other constructors which can > > not specify default values, it would be null. > > > > Second, before the window, there is a `keyBy` operation. it will partition > > your data. For each partition, the default value state is null. > > > > Best, > > Vino > > > > [1]: > > https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/api/common/state/ValueStateDescriptor.html > > > > > Utopia <gejunwei...@gmail.com> 于2019年12月18日周三 下午7:20写道: > > > > Hi, > > > > > > > > I want to get the last value stored in ValueState when processing > > > > element in Trigger. > > > > > > > > But as the log shows that sometimes I can get the value, sometimes not. > > > > > > > > Only one key in my data(SensorReading). > > > > > > > > ValueState: > > > > class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] { > > > > > > > > private lazy val descriptor = new ValueStateDescriptor[Long]("desc", > > > > classOf[Long]) > > > > var value = 1 > > > > override def onElement( r: SensorReading, timestamp: Long, window: > > > > TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { > > > > > > > > println("before update value : " + > > > > ctx.getPartitionedState(descriptor).value()) > > > > > > > > ctx.getPartitionedState(descriptor).update(value) > > > > > > > > value += 1 > > > > > > > > println("after update value: " + > > > > ctx.getPartitionedState(descriptor).value()) > > > > > > > > ctx.registerProcessingTimeTimer(window.maxTimestamp) > > > > TriggerResult.CONTINUE > > > > } > > > > > > > > override def onEventTime(time: Long, window: TimeWindow, ctx: > > > > Trigger.TriggerContext) = TriggerResult.CONTINUE > > > > > > > > override def onProcessingTime(time: Long, window: TimeWindow, ctx: > > > > Trigger.TriggerContext) = TriggerResult.FIRE > > > > > > > > override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): > > > > Unit = { > > > > ctx.deleteProcessingTimeTimer(window.maxTimestamp) > > > > } > > > > > > > > override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): > > > > Unit = { > > > > val windowMaxTimestamp = window.maxTimestamp > > > > if (windowMaxTimestamp > ctx.getCurrentProcessingTime) > > > > ctx.registerProcessingTimeTimer(windowMaxTimestamp) > > > > } > > > > > > > > override def canMerge: Boolean = true > > > > > > > > } > > > > > > > > Main process: > > > > object MyCustomWindows { > > > > > > > > def main(args: Array[String]): Unit = { > > > > > > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > > > env.getCheckpointConfig.setCheckpointInterval(10 * 1000) > > > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > > > env.getConfig.setAutoWatermarkInterval(1000L) > > > > > > > > val sensorData: DataStream[SensorReading] = env > > > > .addSource(new SensorSource) > > > > .assignTimestampsAndWatermarks(new SensorTimeAssigner) > > > > > > > > val countsPerThirtySecs = sensorData > > > > .keyBy(_.id) > > > > .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000))) > > > > .trigger(new ProcessingTimeTrigger) > > > > .process(new CountFunction) > > > > > > > > env.execute() > > > > } > > > > } > > > > > > > > Log results: > > > > > > > > > before update value : null > > > > > after update value: 1 > > > > > before update value : null > > > > > after update value: 2 > > > > > before update value : null > > > > > after update value: 3 > > > > > before update value : 3 > > > > > after update value: 4 > > > > > before update value : null > > > > > after update value: 5 > > > > > before update value : null > > > > > after update value: 6 > > > > > before update value : null > > > > > after update value: 7 > > > > > before update value : null > > > > > after update value: 8 > > > > > before update value : null > > > > > after update value: 9 > > > > > before update value : 9 > > > > > after update value: 10 > > > > > > > > > > > > Best regards > > > > Utopia