Thanks Max ^^ On Wed, Oct 28, 2015 at 8:41 PM, Maximilian Michels <m...@apache.org> wrote:
> Oups, forgot the mapper :) > > static class StatefulMapper extends RichMapFunction<Tuple2<Long, > Long>, Tuple2<Long, Long>> { > > private OperatorState<Long> counter; > > @Override > public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws > Exception { > System.out.println("Key: " + value.f0 + > " Previous state was: "+ counter.value() + > " Update state to: "+ value.f1); > counter.update(value.f1); > return value; > } > > @Override > public void open(Configuration config) { > counter = getRuntimeContext().getKeyValueState("mystate", > Long.class, -1L); > } > } > > > > On Wed, Oct 28, 2015 at 7:39 PM, Maximilian Michels <m...@apache.org> > wrote: > > > Hi Andra, > > > > What you thought of turns out to be one of the core features of the Flink > > streaming API. Flink's operators support state. State can be partitioned > by > > the the key using keyBy(field). > > > > You may use a MapFunction to achieve what you wanted like so: > > > > public static void main(String[] args) throws Exception { > > > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > env.fromElements(new Tuple2<>(1L, 3L), > > new Tuple2<>(2L, 5L), > > new Tuple2<>(6L, 7L), > > new Tuple2<>(1L, 5L)) > > > > .keyBy(0) > > > > .map(new StatefulMapper()) > > > > .print(); > > > > env.execute(); > > > > } > > > > The output is the following on my machine (discarded the output of the > > print): > > > > Key: 2 Previous state was: -1 Update state to: 5 > > Key: 1 Previous state was: -1 Update state to: 3 > > Key: 6 Previous state was: -1 Update state to: 7 > > Key: 1 Previous state was: 3 Update state to: 5 > > > > > > Cheers, > > Max > > > > > > > > On Wed, Oct 28, 2015 at 4:30 PM, Andra Lungu <lungu.an...@gmail.com> > > wrote: > > > >> Hey guys! > >> > >> I've been thinking about this one today: > >> > >> Say you have a stream of data in the form of (id, value) - This will > >> evidently be a DataStream of Tuple2. > >> I need to cache this data in some sort of static stream (perhaps even a > >> DataSet). > >> Then, if in the input stream, I see an id that was previously stored, I > >> should update its value with the most recent entry. > >> > >> On an example: > >> > >> 1, 3 > >> 2, 5 > >> 6, 7 > >> 1, 5 > >> > >> The value cached for the id 1 should be 5. > >> > >> How would you recommend caching the data? And what would be used for the > >> update? A join function? > >> > >> As far as I see things, you cannot really combine DataSets with > >> DataStreams > >> although a DataSet is, in essence, just a finite stream. > >> If this can indeed be done, some pseudocode would be nice :) > >> > >> Thanks! > >> Andra > >> > > > > >