Hi Andra, What you thought of turns out to be one of the core features of the Flink streaming API. Flink's operators support state. State can be partitioned by the the key using keyBy(field).
You may use a MapFunction to achieve what you wanted like so: public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromElements(new Tuple2<>(1L, 3L), new Tuple2<>(2L, 5L), new Tuple2<>(6L, 7L), new Tuple2<>(1L, 5L)) .keyBy(0) .map(new StatefulMapper()) .print(); env.execute(); } The output is the following on my machine (discarded the output of the print): Key: 2 Previous state was: -1 Update state to: 5 Key: 1 Previous state was: -1 Update state to: 3 Key: 6 Previous state was: -1 Update state to: 7 Key: 1 Previous state was: 3 Update state to: 5 Cheers, Max On Wed, Oct 28, 2015 at 4:30 PM, Andra Lungu <lungu.an...@gmail.com> wrote: > Hey guys! > > I've been thinking about this one today: > > Say you have a stream of data in the form of (id, value) - This will > evidently be a DataStream of Tuple2. > I need to cache this data in some sort of static stream (perhaps even a > DataSet). > Then, if in the input stream, I see an id that was previously stored, I > should update its value with the most recent entry. > > On an example: > > 1, 3 > 2, 5 > 6, 7 > 1, 5 > > The value cached for the id 1 should be 5. > > How would you recommend caching the data? And what would be used for the > update? A join function? > > As far as I see things, you cannot really combine DataSets with DataStreams > although a DataSet is, in essence, just a finite stream. > If this can indeed be done, some pseudocode would be nice :) > > Thanks! > Andra >