Thanks for help guys!
Eventually I did implemented it as a RichFunction using open() and closed()
methods.

MichaƂ

2016-02-25 19:00 GMT+01:00 Stephan Ewen <se...@apache.org>:

> Fabian's suggestion with the co-map is good. You can use a "broadcast()"
> connect to make sure the dictionary gets to all nodes.
>
> If you want full control about how and when to read  the data, a scheduled
> task is not that bad even as a solution. Make sure you implement this as a
> "RichFunction", so you can use "open()" to read the first set of data and
> "close()" to stop your threads.
>
> As a related issue: We are looking into extensions to the API to
> explicitly support such "slow changing inputs" in a similar way as
> "broadcast variables" work in the DataSet API.
> This is the JIRA issue, if you post your use case there, you can make this
> part of the discussion: https://issues.apache.org/jira/browse/FLINK-3514
>
> Greetings,
> Stephan
>
>
>
>
>
>
> On Mon, Feb 15, 2016 at 12:33 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Michal,
>>
>> If I got your requirements right, you could try to solve this issue by
>> serving the updates through a regular DataStream.
>> You could add a SourceFunction which periodically emits a new version of
>> the cache and a CoFlatMap operator which receives on the first input the
>> regular streamed input and on the second input the cache updates. If the
>> Flink job gets stopped, the update source will be canceled as a regular
>> source.
>>
>> You might also want to expose the cache as operator state to Flink to
>> ensure it is checkpointed and restored in case of a failure.
>>
>> Best, Fabian
>>
>> 2016-02-14 18:36 GMT+01:00 Michal Fijolek <michalfijole...@gmail.com>:
>>
>>> Hello.
>>> My app needs Map[K, V] as simple cache for business data, which needs to
>>> be invalidated periodically, lets say once per day.
>>> Right now I'm using rather naive approach which is
>>>
>>> trait Dictionary[K, V] extends Serializable {
>>>   @volatile private var cache: Map[K, V] = Map()
>>>   def lookup(id: K): Option[V] = cache.get(id)
>>>   private def fetchDictionary: Map[K, V] = ???
>>>   private def updateDictionary() = {
>>>     cache = fetchDictionary
>>>   }
>>>   val invalidate = new Runnable with Serializable {
>>>     override def run(): Unit = updateDictionary()
>>>   }
>>>   
>>> Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(invalidate,
>>>  oncePerDay)
>>> }
>>>
>>> This seems wrong, because I guess I should do such thing `inside` Flink, 
>>> and when I stop Flink job, nobody's gonna stop scheduled invalidation tasks.
>>> What will be idomatic Flink way to approach this problem? How can I 
>>> schedule tasks and make Flink aware of them?
>>>
>>> Thanks,
>>> Michal
>>>
>>>
>>
>

Reply via email to