Fabian's suggestion with the co-map is good. You can use a "broadcast()" connect to make sure the dictionary gets to all nodes.
If you want full control about how and when to read the data, a scheduled task is not that bad even as a solution. Make sure you implement this as a "RichFunction", so you can use "open()" to read the first set of data and "close()" to stop your threads. As a related issue: We are looking into extensions to the API to explicitly support such "slow changing inputs" in a similar way as "broadcast variables" work in the DataSet API. This is the JIRA issue, if you post your use case there, you can make this part of the discussion: https://issues.apache.org/jira/browse/FLINK-3514 Greetings, Stephan On Mon, Feb 15, 2016 at 12:33 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Michal, > > If I got your requirements right, you could try to solve this issue by > serving the updates through a regular DataStream. > You could add a SourceFunction which periodically emits a new version of > the cache and a CoFlatMap operator which receives on the first input the > regular streamed input and on the second input the cache updates. If the > Flink job gets stopped, the update source will be canceled as a regular > source. > > You might also want to expose the cache as operator state to Flink to > ensure it is checkpointed and restored in case of a failure. > > Best, Fabian > > 2016-02-14 18:36 GMT+01:00 Michal Fijolek <michalfijole...@gmail.com>: > >> Hello. >> My app needs Map[K, V] as simple cache for business data, which needs to >> be invalidated periodically, lets say once per day. >> Right now I'm using rather naive approach which is >> >> trait Dictionary[K, V] extends Serializable { >> @volatile private var cache: Map[K, V] = Map() >> def lookup(id: K): Option[V] = cache.get(id) >> private def fetchDictionary: Map[K, V] = ??? >> private def updateDictionary() = { >> cache = fetchDictionary >> } >> val invalidate = new Runnable with Serializable { >> override def run(): Unit = updateDictionary() >> } >> >> Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(invalidate, >> oncePerDay) >> } >> >> This seems wrong, because I guess I should do such thing `inside` Flink, and >> when I stop Flink job, nobody's gonna stop scheduled invalidation tasks. >> What will be idomatic Flink way to approach this problem? How can I schedule >> tasks and make Flink aware of them? >> >> Thanks, >> Michal >> >> >