Hi Michal,

If I got your requirements right, you could try to solve this issue by
serving the updates through a regular DataStream.
You could add a SourceFunction which periodically emits a new version of
the cache and a CoFlatMap operator which receives on the first input the
regular streamed input and on the second input the cache updates. If the
Flink job gets stopped, the update source will be canceled as a regular
source.

You might also want to expose the cache as operator state to Flink to
ensure it is checkpointed and restored in case of a failure.

Best, Fabian

2016-02-14 18:36 GMT+01:00 Michal Fijolek <michalfijole...@gmail.com>:

> Hello.
> My app needs Map[K, V] as simple cache for business data, which needs to
> be invalidated periodically, lets say once per day.
> Right now I'm using rather naive approach which is
>
> trait Dictionary[K, V] extends Serializable {
>   @volatile private var cache: Map[K, V] = Map()
>   def lookup(id: K): Option[V] = cache.get(id)
>   private def fetchDictionary: Map[K, V] = ???
>   private def updateDictionary() = {
>     cache = fetchDictionary
>   }
>   val invalidate = new Runnable with Serializable {
>     override def run(): Unit = updateDictionary()
>   }
>   
> Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(invalidate, 
> oncePerDay)
> }
>
> This seems wrong, because I guess I should do such thing `inside` Flink, and 
> when I stop Flink job, nobody's gonna stop scheduled invalidation tasks.
> What will be idomatic Flink way to approach this problem? How can I schedule 
> tasks and make Flink aware of them?
>
> Thanks,
> Michal
>
>

Reply via email to