Hi Utopia,

IMO, your analysis is correct.

Best,
Vino

Utopia <gejunwei...@gmail.com> 于2019年12月19日周四 上午12:44写道:

> Hi Vino,
>
> Maybe it is due to the type of window. What I used is
> ProcessingTimeSessionWindows, while keyedState is scoped to *window and
> key*. Window changes so that the ValueState is different.
>
> Best  regards
> Utopia
> 在 2019年12月18日 +0800 22:30,Utopia <gejunwei...@gmail.com>,写道:
>
> Hi Vino,
>
> Thanks for your reply !
>
> The key of my input data is same value. So I think there is only one
> partition.
>
> And Why sometimes I can get the value stored in the ValueState before
> update?
>
> before update value : 3
>>
>> after update value: 4
>>
>>
> What’s more, How can I stored the previous value so that I can get the
> value when next element come in and invoke the onElement method?
>
>
>
> Best  regards
> Utopia
> 在 2019年12月18日 +0800 21:57,vino yang <yanghua1...@gmail.com>,写道:
>
> Hi Utopia,
>
> The behavior may be correct.
>
> First, the default value is null. It's the correct value.
> `ValueStateDescriptor` has multiple constructors, some of them can let you
> specify a default value. However, these constructors are deprecated. And
> the doc does not recommend them.[1] For the other constructors which can
> not specify default values, it would be null.
>
> Second, before the window, there is a `keyBy` operation. it will partition
> your data. For each partition, the default value state is null.
>
> Best,
> Vino
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/api/common/state/ValueStateDescriptor.html
>
> Utopia <gejunwei...@gmail.com> 于2019年12月18日周三 下午7:20写道:
>
>> Hi,
>>
>> I want to get the last value stored in ValueState when processing element
>> in Trigger.
>>
>> But as the log shows that sometimes I can get the value, sometimes not.
>>
>> Only one key in my data(SensorReading).
>>
>> ValueState:
>>
>> class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {
>>
>>   private lazy val descriptor = new ValueStateDescriptor[Long]("desc", 
>> classOf[Long])
>>
>>   var value = 1
>>
>>   override def onElement( r: SensorReading, timestamp: Long, window: 
>> TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
>>
>>     println("before update value : " + 
>> ctx.getPartitionedState(descriptor).value())
>>
>>     ctx.getPartitionedState(descriptor).update(value)
>>
>>     value += 1
>>
>>     println("after update value: " + 
>> ctx.getPartitionedState(descriptor).value())
>>
>>     ctx.registerProcessingTimeTimer(window.maxTimestamp)
>>     TriggerResult.CONTINUE
>>   }
>>
>>   override def onEventTime(time: Long, window: TimeWindow, ctx: 
>> Trigger.TriggerContext) = TriggerResult.CONTINUE
>>
>>   override def onProcessingTime(time: Long, window: TimeWindow, ctx: 
>> Trigger.TriggerContext) = TriggerResult.FIRE
>>
>>   override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit 
>> = {
>>     ctx.deleteProcessingTimeTimer(window.maxTimestamp)
>>   }
>>
>>   override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): 
>> Unit = {
>>     val windowMaxTimestamp = window.maxTimestamp
>>     if (windowMaxTimestamp > ctx.getCurrentProcessingTime) 
>> ctx.registerProcessingTimeTimer(windowMaxTimestamp)
>>   }
>>
>>   override def canMerge: Boolean = true
>>
>> }
>>
>>
>> Main process:
>>
>> object MyCustomWindows {
>>
>>   def main(args: Array[String]): Unit = {
>>
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>     env.getConfig.setAutoWatermarkInterval(1000L)
>>
>>     val sensorData: DataStream[SensorReading] = env
>>       .addSource(new SensorSource)
>>       .assignTimestampsAndWatermarks(new SensorTimeAssigner)
>>
>>     val countsPerThirtySecs = sensorData
>>       .keyBy(_.id)
>>       .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
>>       .trigger(new ProcessingTimeTrigger)
>>       .process(new CountFunction)
>>
>>     env.execute()
>>   }
>> }
>>
>>
>> Log results:
>>
>> before update value : null
>> after update value: 1
>> before update value : null
>> after update value: 2
>> before update value : null
>> after update value: 3
>> before update value : 3
>> after update value: 4
>> before update value : null
>> after update value: 5
>> before update value : null
>> after update value: 6
>> before update value : null
>> after update value: 7
>> before update value : null
>> after update value: 8
>> before update value : null
>> after update value: 9
>> before update value : 9
>> after update value: 10
>>
>>
>>
>> Best  regards
>> Utopia
>>
>

Reply via email to