schedule tasks `inside` Flink

2016-02-14 Thread Michal Fijolek
Hello. My app needs Map[K, V] as simple cache for business data, which needs to be invalidated periodically, lets say once per day. Right now I'm using rather naive approach which is trait Dictionary[K, V] extends Serializable { @volatile private var cache: Map[K, V] = Map() def lookup(id: K):

Re: Finding the average temperature

2016-02-14 Thread Nirmalya Sengupta
Hello Stefano I have tried to implement what I understood from your mail earlier in the day, but it doesn't seem to produce the result I expect. Here's the code snippet: - val env = StreamExecutionEnvironment.createLocal

Re: Finding the average temperature

2016-02-14 Thread Stefano Baghino
Hello again Nirmalaya, in this case you are keying by timestamp; think of keying as grouping: this means that windows are brought together according to their timestamp. I misread your original post but now that I see the code I understand your problem. I've put some code here: https://github.com/

Newbie question

2016-02-14 Thread Renato Marroquín Mogrovejo
Hi all, I have two streams in which I need to keep counts of different metrics that will have to be shared by both of the streams. So they will be sharing some state once they have finished processing the stream. My question is if I should do this as a sink aggregating what I need at the end or by

Re: Newbie question

2016-02-14 Thread Gyula Fóra
Hi Renato, First of all to do anything together on the two streams you probably want to union them. This means that you need to have a common type. If this is the case you are lucky and you don't need anything else. Otherwise I suggest using the Either type provided by Flink as a simple wrapper.