Hi, as per the mail subject I wanted to ask you if a State access (read and write) is synchronized.
I have the following stream: val airtrafficEvents = stream .keyBy(_.flightInfo.flight) .map(new UpdateIdFunction()) where UpdateIdFunction is a RichMapFunction with a ValueState and a MapState, with the following map method def map(value: AirTrafficEvent): AirTrafficEventWithId = { val flight = value.flightInfo.flight val time = value.instantValues.time AirTrafficEventWithId(value, createOrGetId(flight, time.getMillis)) } private def createOrGetId(_key: String, _time: Long): Int = { val tmpId = valuestate.value //Remove from MapState entries older than one minute val entry = Option[(Int, Long)](lookupMap.get(_key)) //update ValueState or MapState if needed //return current updated ValueState or corresponding ID from updated MapState } So, I'm using the MapState to track the integer IDs of the events of the stream, retaining only the latest records inside the MapState, and I'm using the ValueState to generate an incremental integer ID for said events. Given all of this, I'm really not sure how the mapping is applied to the keyedstream in input: is it guaranteed that each time the method is called I'm getting the latest and updated value/map? Thank you for your attention, Federico