Re: Caching information from a stream

2015-10-29 Thread Andra Lungu
Thanks Max ^^ On Wed, Oct 28, 2015 at 8:41 PM, Maximilian Michels wrote: > Oups, forgot the mapper :) > > static class StatefulMapper extends RichMapFunction Long>, Tuple2> { > >private OperatorState counter; > >@Override >public Tuple2 map(Tuple2 value) throws > Exception { >

Re: Caching information from a stream

2015-10-28 Thread Maximilian Michels
Oups, forgot the mapper :) static class StatefulMapper extends RichMapFunction, Tuple2> { private OperatorState counter; @Override public Tuple2 map(Tuple2 value) throws Exception { System.out.println("Key: " + value.f0 + " Previous state was: "+ counter.value() +

Re: Caching information from a stream

2015-10-28 Thread Maximilian Michels
Hi Andra, What you thought of turns out to be one of the core features of the Flink streaming API. Flink's operators support state. State can be partitioned by the the key using keyBy(field). You may use a MapFunction to achieve what you wanted like so: public static void main(String[] args) th

Caching information from a stream

2015-10-28 Thread Andra Lungu
Hey guys! I've been thinking about this one today: Say you have a stream of data in the form of (id, value) - This will evidently be a DataStream of Tuple2. I need to cache this data in some sort of static stream (perhaps even a DataSet). Then, if in the input stream, I see an id that was previou