Hi, Thanks for the example. We have done it with windows before and it works. We are using state because the data comes with a gap of several days and we can't handle a window size of several days. That's why we decided to use the state.
On 27 November 2015 at 11:09, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > I’ll try to go into a bit more detail about the windows here. What you can > do is this: > > DataStream<Tuple3<String, Double, Long>> input = … // fields are (id, sum, > count), where count is initialized to 1, similar to word count > > DataStream<Tuple3<String, Double, Long>> counts = input > .keyBy(0) > .timeWindow(Time.minutes(10)) > .reduce(new MyCountingReducer()) > > DataStream<Tuple3<String, Double, Long>> result = counts.map( <mapper that > divides sum by count> ) > > Does this help? Here, you don’t even have to deal with state, the > windowing system will keep the state (i.e. the reduced) value in internal > state in a fault tolerant fashion. > > Cheers, > Aljoscha > > On 26 Nov 2015, at 17:14, Stephan Ewen <se...@apache.org> wrote: > > > > Hi! > > > > In streaming, there is no "end" of the stream when you would emit the > final sum. That's why there are windows. > > > > If you do not want the partial sums, but only the final sum, you need to > define what window in which the sum is computed. At the end of that window, > that value is emitted. The window can be based on time, counts, or other > measures. > > > > Greetings, > > Stephan > > > > > > On Thu, Nov 26, 2015 at 4:07 PM, Lopez, Javier <javier.lo...@zalando.de> > wrote: > > Hi, thanks for the answer. It worked but not in the way we expected. We > expect to have only one sum per ID and we are getting all the consecutive > sums, for example: > > > > We expect this: (11,6) but we get this (11,1) (11,3) (11,6) (the initial > values are ID -> 11, values -> 1,2,3). Here is the code we are using for > our test: > > > > DataStream<T > > uple2<String, Double>> stream = ...; > > > > > > DataStream<Tuple4<String, Double, Long, Double>> result = > stream.keyBy(0).map(new RollingSum()); > > > > > > > > public static class RollingSum extends RichMapFunction<Tuple2<String, > Double>, Tuple4<String, Double, Long, Double>> { > > > > // persistent counter > > private OperatorState<Double> sum; > > private OperatorState<Long> count; > > > > > > @Override > > public Tuple4<String, Double, Long, Double> map(Tuple2<String, > Double> value1) { > > try { > > Double newSum = sum.value()+value1.f1; > > > > sum.update(newSum); > > count.update(count.value()+1); > > return new Tuple4<String, Double, Long, > Double>(value1.f0,sum.value(),count.value(),sum.value()/count.value()); > > } catch (IOException e) { > > // TODO Auto-generated catch block > > e.printStackTrace(); > > } > > > > return null; > > > > } > > > > @Override > > public void open(Configuration config) { > > sum = getRuntimeContext().getKeyValueState("mySum", > Double.class, 0D); > > count = getRuntimeContext().getKeyValueState("myCounter", > Long.class, 0L); > > } > > > > } > > > > > > We are using a Tuple4 because we want to calculate the sum and the > average (So our Tuple is ID, SUM, Count, AVG). Do we need to add another > step to get a single value out of it? or is this the expected behavior. > > > > Thanks again for your help. > > > > On 25 November 2015 at 17:19, Stephan Ewen <se...@apache.org> wrote: > > Hi Javier! > > > > You can solve this both using windows, or using manual state. > > > > What is better depends a bit on when you want to have the result (the > sum). Do you want a result emitted after each update (or do some other > operation with that value) or do you want only the final sum after a > certain time? > > > > For the second variant, I would use a window, for the first variant, you > could use custom state as follows: > > > > For each element, you take the current state for the key, add the value > to get the new sum. Then you update the state with the new sum and emit the > value as well... > > > > Java: > > > > DataStream<T > > uple2<String, Long>> stream = ...; > > > > > > DataStream<Tuple2<String, Long>> result = stream.keyBy(0).map(new > RollingSum()); > > > > > > public > > class RollingSum extends RichMapFunction<Tuple2<String, Long>, > Tuple2<String, Long>> { > > > > > > > > private OperatorState<Long> sum; > > > > > > > > @Override > > > > > > public Tuple2<String, Long> map(Tuple2<String, Long> value) { > > long > > newSum = sum.value() + value.f1; > > > > sum.update(newSum); > > > > > > return new Tuple2<>(value.f0, newSum); > > > > > > } > > > > > > > > @Override > > > > > > public void open(Configuration config) { > > > > > > counter = getRuntimeContext().getKeyValueState("myCounter", Long.class, > 0L); > > > > > > } > > } > > > > > > In Scala, you can write this briefly as: > > > > val stream: DataStream[(String, Int)] = ... > > > > > > > > val counts: DataStream[(String, Int)] = stream > > > > > > .keyBy(_._1) > > > > > > .mapWithState((in: (String, Int), sum: Option[Int]) > > => { > > > > val newSum = in._2 + sum.getOrElse(0) > > > > ( ( > > in._1, newSum), Some(newSum) ) > > } > > > > Does that help? > > > > Thanks also for pointing out the error in the sample code... > > > > Greetings, > > Stephan > > > > > > On Wed, Nov 25, 2015 at 4:55 PM, Lopez, Javier <javier.lo...@zalando.de> > wrote: > > Hi, > > > > We are trying to do a test using States but we have not been able to > achieve our desired result. Basically we have a data stream with data as > [{"id":"11","value":123}] and we want to calculate the sum of all values > grouping by ID. We were able to achieve this using windows but not with > states. The example that is in the documentation ( > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#working-with-state) > is not very clear and even has some errors, for example: > > > > public class CounterSum implements RichReduceFunction<Long> > > should be > > public class CounterSum extends RichReduceFunction<Long> > > as RichReduceFuncion is a Class, not an interface. > > > > We wanted to ask you if you have an example of how to use States with > Flink. > > > > Thanks in advance for your help. > > > > > > > > > > > >