Hi Javier, Thanks for your question. I've corrected the documentation (will be online soon).
Cheers, Max On Wed, Nov 25, 2015 at 5:19 PM, Stephan Ewen <se...@apache.org> wrote: > Hi Javier! > > You can solve this both using windows, or using manual state. > > What is better depends a bit on when you want to have the result (the sum). > Do you want a result emitted after each update (or do some other operation > with that value) or do you want only the final sum after a certain time? > > For the second variant, I would use a window, for the first variant, you > could use custom state as follows: > > For each element, you take the current state for the key, add the value to > get the new sum. Then you update the state with the new sum and emit the > value as well... > > Java: > > DataStream<Tuple2<String, Long>> stream = ...; > > DataStream<Tuple2<String, Long>> result = stream.keyBy(0).map(new > RollingSum()); > > > public class RollingSum extends RichMapFunction<Tuple2<String, Long>, > Tuple2<String, Long>> { > > private OperatorState<Long> sum; > > @Override > public Tuple2<String, Long> map(Tuple2<String, Long> value) { > long newSum = sum.value() + value.f1; > sum.update(newSum); > return new Tuple2<>(value.f0, newSum); > } > > @Override > public void open(Configuration config) { > counter = getRuntimeContext().getKeyValueState("myCounter", > Long.class, 0L); > } > } > > > > In Scala, you can write this briefly as: > > val stream: DataStream[(String, Int)] = ... > > val counts: DataStream[(String, Int)] = stream > .keyBy(_._1) > .mapWithState((in: (String, Int), sum: Option[Int]) => { > val newSum = in._2 + sum.getOrElse(0) > ( (in._1, newSum), Some(newSum) ) > } > > > Does that help? > > Thanks also for pointing out the error in the sample code... > > Greetings, > Stephan > > > On Wed, Nov 25, 2015 at 4:55 PM, Lopez, Javier <javier.lo...@zalando.de> > wrote: >> >> Hi, >> >> We are trying to do a test using States but we have not been able to >> achieve our desired result. Basically we have a data stream with data as >> [{"id":"11","value":123}] and we want to calculate the sum of all values >> grouping by ID. We were able to achieve this using windows but not with >> states. The example that is in the documentation >> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#working-with-state) >> is not very clear and even has some errors, for example: >> >> public class CounterSum implements RichReduceFunction<Long> >> >> should be >> >> public class CounterSum extends RichReduceFunction<Long> >> >> as RichReduceFuncion is a Class, not an interface. >> >> We wanted to ask you if you have an example of how to use States with >> Flink. >> >> Thanks in advance for your help. >> >> > >