Hi Javier,

Thanks for your question. I've corrected the documentation (will be
online soon).

Cheers,
Max

On Wed, Nov 25, 2015 at 5:19 PM, Stephan Ewen <se...@apache.org> wrote:
> Hi Javier!
>
> You can solve this both using windows, or using manual state.
>
> What is better depends a bit on when you want to have the result (the sum).
> Do you want a result emitted after each update (or do some other operation
> with that value) or do you want only the final sum after a certain time?
>
> For the second variant, I would use a window, for the first variant, you
> could use custom state as follows:
>
> For each element, you take the current state for the key, add the value to
> get the new sum. Then you update the state with the new sum and emit the
> value as well...
>
> Java:
>
> DataStream<Tuple2<String, Long>> stream = ...;
>
> DataStream<Tuple2<String, Long>> result = stream.keyBy(0).map(new
> RollingSum());
>
>
> public class RollingSum extends RichMapFunction<Tuple2<String, Long>,
> Tuple2<String, Long>> {
>
>     private OperatorState<Long> sum;
>
>     @Override
>     public Tuple2<String, Long> map(Tuple2<String, Long> value) {
>         long newSum = sum.value() + value.f1;
>         sum.update(newSum);
>         return new Tuple2<>(value.f0, newSum);
>     }
>
>     @Override
>     public void open(Configuration config) {
>         counter = getRuntimeContext().getKeyValueState("myCounter",
> Long.class, 0L);
>     }
> }
>
>
>
> In Scala, you can write this briefly as:
>
> val stream: DataStream[(String, Int)] = ...
>
> val counts: DataStream[(String, Int)] = stream
>   .keyBy(_._1)
>   .mapWithState((in: (String, Int), sum: Option[Int]) => {
>     val newSum = in._2 + sum.getOrElse(0)
>     ( (in._1, newSum), Some(newSum) )
>  }
>
>
> Does that help?
>
> Thanks also for pointing out the error in the sample code...
>
> Greetings,
> Stephan
>
>
> On Wed, Nov 25, 2015 at 4:55 PM, Lopez, Javier <javier.lo...@zalando.de>
> wrote:
>>
>> Hi,
>>
>> We are trying to do a test using States but we have not been able to
>> achieve our desired result. Basically we have a data stream with data as
>> [{"id":"11","value":123}] and we want to calculate the sum of all values
>> grouping by ID. We were able to achieve this using windows but not with
>> states. The example that is in the documentation
>> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#working-with-state)
>> is not very clear and even has some errors, for example:
>>
>> public class CounterSum implements RichReduceFunction<Long>
>>
>> should be
>>
>> public class CounterSum extends RichReduceFunction<Long>
>>
>> as RichReduceFuncion is a Class, not an interface.
>>
>> We wanted to ask you if you have an example of how to use States with
>> Flink.
>>
>> Thanks in advance for your help.
>>
>>
>
>

Reply via email to