Oups, forgot the mapper :) static class StatefulMapper extends RichMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private OperatorState<Long> counter; @Override public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception { System.out.println("Key: " + value.f0 + " Previous state was: "+ counter.value() + " Update state to: "+ value.f1); counter.update(value.f1); return value; } @Override public void open(Configuration config) { counter = getRuntimeContext().getKeyValueState("mystate", Long.class, -1L); } } On Wed, Oct 28, 2015 at 7:39 PM, Maximilian Michels <m...@apache.org> wrote: > Hi Andra, > > What you thought of turns out to be one of the core features of the Flink > streaming API. Flink's operators support state. State can be partitioned by > the the key using keyBy(field). > > You may use a MapFunction to achieve what you wanted like so: > > public static void main(String[] args) throws Exception { > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.fromElements(new Tuple2<>(1L, 3L), > new Tuple2<>(2L, 5L), > new Tuple2<>(6L, 7L), > new Tuple2<>(1L, 5L)) > > .keyBy(0) > > .map(new StatefulMapper()) > > .print(); > > env.execute(); > > } > > The output is the following on my machine (discarded the output of the > print): > > Key: 2 Previous state was: -1 Update state to: 5 > Key: 1 Previous state was: -1 Update state to: 3 > Key: 6 Previous state was: -1 Update state to: 7 > Key: 1 Previous state was: 3 Update state to: 5 > > > Cheers, > Max > > > > On Wed, Oct 28, 2015 at 4:30 PM, Andra Lungu <lungu.an...@gmail.com> > wrote: > >> Hey guys! >> >> I've been thinking about this one today: >> >> Say you have a stream of data in the form of (id, value) - This will >> evidently be a DataStream of Tuple2. >> I need to cache this data in some sort of static stream (perhaps even a >> DataSet). >> Then, if in the input stream, I see an id that was previously stored, I >> should update its value with the most recent entry. >> >> On an example: >> >> 1, 3 >> 2, 5 >> 6, 7 >> 1, 5 >> >> The value cached for the id 1 should be 5. >> >> How would you recommend caching the data? And what would be used for the >> update? A join function? >> >> As far as I see things, you cannot really combine DataSets with >> DataStreams >> although a DataSet is, in essence, just a finite stream. >> If this can indeed be done, some pseudocode would be nice :) >> >> Thanks! >> Andra >> > >