Thanks Max ^^

On Wed, Oct 28, 2015 at 8:41 PM, Maximilian Michels <m...@apache.org> wrote:

> Oups, forgot the mapper :)
>
> static class StatefulMapper extends RichMapFunction<Tuple2<Long,
> Long>, Tuple2<Long, Long>> {
>
>    private OperatorState<Long> counter;
>
>    @Override
>    public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws
> Exception {
>       System.out.println("Key: " + value.f0 +
>             " Previous state was: "+ counter.value() +
>             " Update state to: "+ value.f1);
>       counter.update(value.f1);
>       return value;
>    }
>
>    @Override
>    public void open(Configuration config) {
>       counter = getRuntimeContext().getKeyValueState("mystate",
> Long.class, -1L);
>    }
> }
>
>
>
> On Wed, Oct 28, 2015 at 7:39 PM, Maximilian Michels <m...@apache.org>
> wrote:
>
> > Hi Andra,
> >
> > What you thought of turns out to be one of the core features of the Flink
> > streaming API. Flink's operators support state. State can be partitioned
> by
> > the the key using keyBy(field).
> >
> > You may use a MapFunction  to achieve what you wanted like so:
> >
> > public static void main(String[] args) throws Exception {
> >
> >    final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >
> >    env.fromElements(new Tuple2<>(1L, 3L),
> >          new Tuple2<>(2L, 5L),
> >          new Tuple2<>(6L, 7L),
> >          new Tuple2<>(1L, 5L))
> >
> >        .keyBy(0)
> >
> >        .map(new StatefulMapper())
> >
> >        .print();
> >
> >    env.execute();
> >
> > }
> >
> > The output is the following on my machine (discarded the output of the
> > print):
> >
> > Key: 2 Previous state was: -1 Update state to: 5
> > Key: 1 Previous state was: -1 Update state to: 3
> > Key: 6 Previous state was: -1 Update state to: 7
> > Key: 1 Previous state was: 3 Update state to: 5
> >
> >
> > Cheers,
> > Max
> >
> >
> >
> > On Wed, Oct 28, 2015 at 4:30 PM, Andra Lungu <lungu.an...@gmail.com>
> > wrote:
> >
> >> Hey guys!
> >>
> >> I've been thinking about this one today:
> >>
> >> Say you have a stream of data in the form of (id, value) - This will
> >> evidently be a DataStream of Tuple2.
> >> I need to cache this data in some sort of static stream (perhaps even a
> >> DataSet).
> >> Then, if in the input stream, I see an id that was previously stored, I
> >> should update its value with the most recent entry.
> >>
> >> On an example:
> >>
> >> 1, 3
> >> 2, 5
> >> 6, 7
> >> 1, 5
> >>
> >> The value cached for the id 1 should be 5.
> >>
> >> How would you recommend caching the data? And what would be used for the
> >> update? A join function?
> >>
> >> As far as I see things, you cannot really combine DataSets with
> >> DataStreams
> >> although a DataSet is, in essence, just a finite stream.
> >> If this can indeed be done, some pseudocode would be nice :)
> >>
> >> Thanks!
> >> Andra
> >>
> >
> >
>

Reply via email to