Hi,

Thank you very much, Chesnay, for this clarification.

2017-09-11 19:36 GMT+02:00 Chesnay Schepler <ches...@apache.org>:

> Hello,
>
> state is local to each parallel instance of an operator. Coupled with the
> fact that the "map" method is always called by the same thread (and never
> concurrently) the ValueState (or any state for that matter) will always
> return the latest values.
>
>
> On 10.09.2017 14:39, Federico D'Ambrosio wrote:
>
> Hi,
>
> as per the mail subject I wanted to ask you if a State access (read and
> write) is synchronized.
>
> I have the following stream:
>
> val airtrafficEvents = stream
> .keyBy(_.flightInfo.flight)
> .map(new UpdateIdFunction())
>
>
> where UpdateIdFunction is a RichMapFunction with a ValueState and a
> MapState, with the following map method
>
> def map(value: AirTrafficEvent): AirTrafficEventWithId = {
>
>   val flight = value.flightInfo.flight
>   val time = value.instantValues.time
>
>   AirTrafficEventWithId(value, createOrGetId(flight, time.getMillis))
>
> }
>
> private def createOrGetId(_key: String, _time: Long): Int = {
>
>   val tmpId = valuestate.value
>
>   //Remove from MapState entries older than one minute
>
>   val entry = Option[(Int, Long)](lookupMap.get(_key))
>
>   //update ValueState or MapState if needed
>
>   //return current updated ValueState or corresponding ID from updated
> MapState
>
> }
>
> So, I'm using the MapState to track the integer IDs of the events of the
> stream, retaining only the latest records inside the MapState, and I'm
> using the ValueState to generate an incremental integer ID for said events.
> Given all of this, I'm really not sure how the mapping is applied to the
> keyedstream in input: is it guaranteed that each time the method is called
> I'm getting the latest and updated value/map?
>
> Thank you for your attention,
> Federico
>
>
>


-- 
Federico D'Ambrosio

Reply via email to