Hi Michal, If I got your requirements right, you could try to solve this issue by serving the updates through a regular DataStream. You could add a SourceFunction which periodically emits a new version of the cache and a CoFlatMap operator which receives on the first input the regular streamed input and on the second input the cache updates. If the Flink job gets stopped, the update source will be canceled as a regular source.
You might also want to expose the cache as operator state to Flink to ensure it is checkpointed and restored in case of a failure. Best, Fabian 2016-02-14 18:36 GMT+01:00 Michal Fijolek <michalfijole...@gmail.com>: > Hello. > My app needs Map[K, V] as simple cache for business data, which needs to > be invalidated periodically, lets say once per day. > Right now I'm using rather naive approach which is > > trait Dictionary[K, V] extends Serializable { > @volatile private var cache: Map[K, V] = Map() > def lookup(id: K): Option[V] = cache.get(id) > private def fetchDictionary: Map[K, V] = ??? > private def updateDictionary() = { > cache = fetchDictionary > } > val invalidate = new Runnable with Serializable { > override def run(): Unit = updateDictionary() > } > > Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(invalidate, > oncePerDay) > } > > This seems wrong, because I guess I should do such thing `inside` Flink, and > when I stop Flink job, nobody's gonna stop scheduled invalidation tasks. > What will be idomatic Flink way to approach this problem? How can I schedule > tasks and make Flink aware of them? > > Thanks, > Michal > >