Hi Vino,

Maybe it is due to the type of window. What I used is 
ProcessingTimeSessionWindows, while keyedState is scoped to window and key. 
Window changes so that the ValueState is different.

Best  regards
Utopia
在 2019年12月18日 +0800 22:30,Utopia <gejunwei...@gmail.com>,写道:
> Hi Vino,
>
> Thanks for your reply !
>
> The key of my input data is same value. So I think there is only one 
> partition.
>
> And Why sometimes I can get the value stored in the ValueState before update?
> > > > > before update value : 3
> > > > > after update value: 4
>
> What’s more, How can I stored the previous value so that I can get the value 
> when next element come in and invoke the onElement method?
>
>
>
> Best  regards
> Utopia
> 在 2019年12月18日 +0800 21:57,vino yang <yanghua1...@gmail.com>,写道:
> > Hi Utopia,
> >
> > The behavior may be correct.
> >
> > First, the default value is null. It's the correct value. 
> > `ValueStateDescriptor` has multiple constructors, some of them can let you 
> > specify a default value. However, these constructors are deprecated. And 
> > the doc does not recommend them.[1] For the other constructors which can 
> > not specify default values, it would be null.
> >
> > Second, before the window, there is a `keyBy` operation. it will partition 
> > your data. For each partition, the default value state is null.
> >
> > Best,
> > Vino
> >
> > [1]: 
> > https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/api/common/state/ValueStateDescriptor.html
> >
> > > Utopia <gejunwei...@gmail.com> 于2019年12月18日周三 下午7:20写道:
> > > > Hi,
> > > >
> > > > I want to get the last value stored in ValueState when processing 
> > > > element in Trigger.
> > > >
> > > > But as the log shows that sometimes I can get the value, sometimes not.
> > > >
> > > > Only one key in my data(SensorReading).
> > > >
> > > > ValueState:
> > > > class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {
> > > >
> > > > private lazy val descriptor = new ValueStateDescriptor[Long]("desc", 
> > > > classOf[Long])
> > > > var value = 1
> > > > override def onElement( r: SensorReading, timestamp: Long, window: 
> > > > TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
> > > >
> > > >   println("before update value : " + 
> > > > ctx.getPartitionedState(descriptor).value())
> > > >
> > > >    ctx.getPartitionedState(descriptor).update(value)
> > > >
> > > >    value += 1
> > > >
> > > >    println("after update value: " + 
> > > > ctx.getPartitionedState(descriptor).value())
> > > >
> > > >    ctx.registerProcessingTimeTimer(window.maxTimestamp)
> > > >    TriggerResult.CONTINUE
> > > > }
> > > >
> > > > override def onEventTime(time: Long, window: TimeWindow, ctx: 
> > > > Trigger.TriggerContext) = TriggerResult.CONTINUE
> > > >
> > > > override def onProcessingTime(time: Long, window: TimeWindow, ctx: 
> > > > Trigger.TriggerContext) = TriggerResult.FIRE
> > > >
> > > > override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): 
> > > > Unit = {
> > > >    ctx.deleteProcessingTimeTimer(window.maxTimestamp)
> > > >  }
> > > >
> > > > override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): 
> > > > Unit = {
> > > >    val windowMaxTimestamp = window.maxTimestamp
> > > >    if (windowMaxTimestamp > ctx.getCurrentProcessingTime) 
> > > > ctx.registerProcessingTimeTimer(windowMaxTimestamp)
> > > >  }
> > > >
> > > > override def canMerge: Boolean = true
> > > >
> > > > }
> > > >
> > > > Main process:
> > > > object MyCustomWindows {
> > > >
> > > > def main(args: Array[String]): Unit = {
> > > >
> > > >    val env = StreamExecutionEnvironment.getExecutionEnvironment
> > > >    env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
> > > >    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> > > >    env.getConfig.setAutoWatermarkInterval(1000L)
> > > >
> > > >    val sensorData: DataStream[SensorReading] = env
> > > >      .addSource(new SensorSource)
> > > >      .assignTimestampsAndWatermarks(new SensorTimeAssigner)
> > > >
> > > >    val countsPerThirtySecs = sensorData
> > > >      .keyBy(_.id)
> > > >     .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
> > > >      .trigger(new ProcessingTimeTrigger)
> > > >      .process(new CountFunction)
> > > >
> > > >    env.execute()
> > > >  }
> > > > }
> > > >
> > > > Log results:
> > > >
> > > > > before update value : null
> > > > > after update value: 1
> > > > > before update value : null
> > > > > after update value: 2
> > > > > before update value : null
> > > > > after update value: 3
> > > > > before update value : 3
> > > > > after update value: 4
> > > > > before update value : null
> > > > > after update value: 5
> > > > > before update value : null
> > > > > after update value: 6
> > > > > before update value : null
> > > > > after update value: 7
> > > > > before update value : null
> > > > > after update value: 8
> > > > > before update value : null
> > > > > after update value: 9
> > > > > before update value : 9
> > > > > after update value: 10
> > > >
> > > >
> > > > Best  regards
> > > > Utopia

Reply via email to