Oups, forgot the mapper :)

static class StatefulMapper extends RichMapFunction<Tuple2<Long,
Long>, Tuple2<Long, Long>> {

   private OperatorState<Long> counter;

   @Override
   public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
      System.out.println("Key: " + value.f0 +
            " Previous state was: "+ counter.value() +
            " Update state to: "+ value.f1);
      counter.update(value.f1);
      return value;
   }

   @Override
   public void open(Configuration config) {
      counter = getRuntimeContext().getKeyValueState("mystate",
Long.class, -1L);
   }
}



On Wed, Oct 28, 2015 at 7:39 PM, Maximilian Michels <m...@apache.org> wrote:

> Hi Andra,
>
> What you thought of turns out to be one of the core features of the Flink
> streaming API. Flink's operators support state. State can be partitioned by
> the the key using keyBy(field).
>
> You may use a MapFunction  to achieve what you wanted like so:
>
> public static void main(String[] args) throws Exception {
>
>    final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>    env.fromElements(new Tuple2<>(1L, 3L),
>          new Tuple2<>(2L, 5L),
>          new Tuple2<>(6L, 7L),
>          new Tuple2<>(1L, 5L))
>
>        .keyBy(0)
>
>        .map(new StatefulMapper())
>
>        .print();
>
>    env.execute();
>
> }
>
> The output is the following on my machine (discarded the output of the
> print):
>
> Key: 2 Previous state was: -1 Update state to: 5
> Key: 1 Previous state was: -1 Update state to: 3
> Key: 6 Previous state was: -1 Update state to: 7
> Key: 1 Previous state was: 3 Update state to: 5
>
>
> Cheers,
> Max
>
>
>
> On Wed, Oct 28, 2015 at 4:30 PM, Andra Lungu <lungu.an...@gmail.com>
> wrote:
>
>> Hey guys!
>>
>> I've been thinking about this one today:
>>
>> Say you have a stream of data in the form of (id, value) - This will
>> evidently be a DataStream of Tuple2.
>> I need to cache this data in some sort of static stream (perhaps even a
>> DataSet).
>> Then, if in the input stream, I see an id that was previously stored, I
>> should update its value with the most recent entry.
>>
>> On an example:
>>
>> 1, 3
>> 2, 5
>> 6, 7
>> 1, 5
>>
>> The value cached for the id 1 should be 5.
>>
>> How would you recommend caching the data? And what would be used for the
>> update? A join function?
>>
>> As far as I see things, you cannot really combine DataSets with
>> DataStreams
>> although a DataSet is, in essence, just a finite stream.
>> If this can indeed be done, some pseudocode would be nice :)
>>
>> Thanks!
>> Andra
>>
>
>

Reply via email to