Thanks for help guys! Eventually I did implemented it as a RichFunction using open() and closed() methods.
MichaĆ 2016-02-25 19:00 GMT+01:00 Stephan Ewen <se...@apache.org>: > Fabian's suggestion with the co-map is good. You can use a "broadcast()" > connect to make sure the dictionary gets to all nodes. > > If you want full control about how and when to read the data, a scheduled > task is not that bad even as a solution. Make sure you implement this as a > "RichFunction", so you can use "open()" to read the first set of data and > "close()" to stop your threads. > > As a related issue: We are looking into extensions to the API to > explicitly support such "slow changing inputs" in a similar way as > "broadcast variables" work in the DataSet API. > This is the JIRA issue, if you post your use case there, you can make this > part of the discussion: https://issues.apache.org/jira/browse/FLINK-3514 > > Greetings, > Stephan > > > > > > > On Mon, Feb 15, 2016 at 12:33 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Michal, >> >> If I got your requirements right, you could try to solve this issue by >> serving the updates through a regular DataStream. >> You could add a SourceFunction which periodically emits a new version of >> the cache and a CoFlatMap operator which receives on the first input the >> regular streamed input and on the second input the cache updates. If the >> Flink job gets stopped, the update source will be canceled as a regular >> source. >> >> You might also want to expose the cache as operator state to Flink to >> ensure it is checkpointed and restored in case of a failure. >> >> Best, Fabian >> >> 2016-02-14 18:36 GMT+01:00 Michal Fijolek <michalfijole...@gmail.com>: >> >>> Hello. >>> My app needs Map[K, V] as simple cache for business data, which needs to >>> be invalidated periodically, lets say once per day. >>> Right now I'm using rather naive approach which is >>> >>> trait Dictionary[K, V] extends Serializable { >>> @volatile private var cache: Map[K, V] = Map() >>> def lookup(id: K): Option[V] = cache.get(id) >>> private def fetchDictionary: Map[K, V] = ??? >>> private def updateDictionary() = { >>> cache = fetchDictionary >>> } >>> val invalidate = new Runnable with Serializable { >>> override def run(): Unit = updateDictionary() >>> } >>> >>> Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(invalidate, >>> oncePerDay) >>> } >>> >>> This seems wrong, because I guess I should do such thing `inside` Flink, >>> and when I stop Flink job, nobody's gonna stop scheduled invalidation tasks. >>> What will be idomatic Flink way to approach this problem? How can I >>> schedule tasks and make Flink aware of them? >>> >>> Thanks, >>> Michal >>> >>> >> >