Hi, Thank you very much, Chesnay, for this clarification.
2017-09-11 19:36 GMT+02:00 Chesnay Schepler <ches...@apache.org>: > Hello, > > state is local to each parallel instance of an operator. Coupled with the > fact that the "map" method is always called by the same thread (and never > concurrently) the ValueState (or any state for that matter) will always > return the latest values. > > > On 10.09.2017 14:39, Federico D'Ambrosio wrote: > > Hi, > > as per the mail subject I wanted to ask you if a State access (read and > write) is synchronized. > > I have the following stream: > > val airtrafficEvents = stream > .keyBy(_.flightInfo.flight) > .map(new UpdateIdFunction()) > > > where UpdateIdFunction is a RichMapFunction with a ValueState and a > MapState, with the following map method > > def map(value: AirTrafficEvent): AirTrafficEventWithId = { > > val flight = value.flightInfo.flight > val time = value.instantValues.time > > AirTrafficEventWithId(value, createOrGetId(flight, time.getMillis)) > > } > > private def createOrGetId(_key: String, _time: Long): Int = { > > val tmpId = valuestate.value > > //Remove from MapState entries older than one minute > > val entry = Option[(Int, Long)](lookupMap.get(_key)) > > //update ValueState or MapState if needed > > //return current updated ValueState or corresponding ID from updated > MapState > > } > > So, I'm using the MapState to track the integer IDs of the events of the > stream, retaining only the latest records inside the MapState, and I'm > using the ValueState to generate an incremental integer ID for said events. > Given all of this, I'm really not sure how the mapping is applied to the > keyedstream in input: is it guaranteed that each time the method is called > I'm getting the latest and updated value/map? > > Thank you for your attention, > Federico > > > -- Federico D'Ambrosio