Hi,

Thanks for the example. We have done it with windows before and it works.
We are using state because the data comes with a gap of several days and we
can't handle a window size of several days. That's why we decided to use
the state.

On 27 November 2015 at 11:09, Aljoscha Krettek <aljos...@apache.org> wrote:

> Hi,
> I’ll try to go into a bit more detail about the windows here. What you can
> do is this:
>
> DataStream<Tuple3<String, Double, Long>> input = … // fields are (id, sum,
> count), where count is initialized to 1, similar to word count
>
> DataStream<Tuple3<String, Double, Long>> counts = input
>   .keyBy(0)
>   .timeWindow(Time.minutes(10))
>   .reduce(new MyCountingReducer())
>
> DataStream<Tuple3<String, Double, Long>> result = counts.map( <mapper that
> divides sum by count> )
>
> Does this help? Here, you don’t even have to deal with state, the
> windowing system will keep the state (i.e. the reduced) value in internal
> state in a fault tolerant fashion.
>
> Cheers,
> Aljoscha
> > On 26 Nov 2015, at 17:14, Stephan Ewen <se...@apache.org> wrote:
> >
> > Hi!
> >
> > In streaming, there is no "end" of the stream when you would emit the
> final sum. That's why there are windows.
> >
> > If you do not want the partial sums, but only the final sum, you need to
> define what window in which the sum is computed. At the end of that window,
> that value is emitted. The window can be based on time, counts, or other
> measures.
> >
> > Greetings,
> > Stephan
> >
> >
> > On Thu, Nov 26, 2015 at 4:07 PM, Lopez, Javier <javier.lo...@zalando.de>
> wrote:
> > Hi, thanks for the answer. It worked but not in the way we expected. We
> expect to have only one sum per ID and we are getting all the consecutive
> sums, for example:
> >
> > We expect this: (11,6) but we get this (11,1) (11,3) (11,6) (the initial
> values are ID -> 11, values -> 1,2,3). Here is the code we are using for
> our test:
> >
> > DataStream<T
> > uple2<String, Double>> stream = ...;
> >
> >
> > DataStream<Tuple4<String, Double, Long, Double>> result =
> stream.keyBy(0).map(new RollingSum());
> >
> >
> >
> > public static class RollingSum extends RichMapFunction<Tuple2<String,
> Double>, Tuple4<String, Double, Long, Double>> {
> >
> >         // persistent counter
> >       private OperatorState<Double> sum;
> >       private OperatorState<Long> count;
> >
> >
> >         @Override
> >         public Tuple4<String, Double, Long, Double> map(Tuple2<String,
> Double> value1) {
> >               try {
> >                       Double newSum = sum.value()+value1.f1;
> >
> >                               sum.update(newSum);
> >                               count.update(count.value()+1);
> >                               return new Tuple4<String, Double, Long,
> Double>(value1.f0,sum.value(),count.value(),sum.value()/count.value());
> >                       } catch (IOException e) {
> >                               // TODO Auto-generated catch block
> >                               e.printStackTrace();
> >                       }
> >
> >               return null;
> >
> >         }
> >
> >         @Override
> >         public void open(Configuration config) {
> >             sum = getRuntimeContext().getKeyValueState("mySum",
> Double.class, 0D);
> >             count = getRuntimeContext().getKeyValueState("myCounter",
> Long.class, 0L);
> >         }
> >
> >     }
> >
> >
> > We are using a Tuple4 because we want to calculate the sum and the
> average (So our Tuple is ID, SUM, Count, AVG). Do we need to add another
> step to get a single value out of it? or is this the expected behavior.
> >
> > Thanks again for your help.
> >
> > On 25 November 2015 at 17:19, Stephan Ewen <se...@apache.org> wrote:
> > Hi Javier!
> >
> > You can solve this both using windows, or using manual state.
> >
> > What is better depends a bit on when you want to have the result (the
> sum). Do you want a result emitted after each update (or do some other
> operation with that value) or do you want only the final sum after a
> certain time?
> >
> > For the second variant, I would use a window, for the first variant, you
> could use custom state as follows:
> >
> > For each element, you take the current state for the key, add the value
> to get the new sum. Then you update the state with the new sum and emit the
> value as well...
> >
> > Java:
> >
> > DataStream<T
> > uple2<String, Long>> stream = ...;
> >
> >
> > DataStream<Tuple2<String, Long>> result = stream.keyBy(0).map(new
> RollingSum());
> >
> >
> > public
> >  class RollingSum extends RichMapFunction<Tuple2<String, Long>,
> Tuple2<String, Long>> {
> >
> >
> >
> > private OperatorState<Long> sum;
> >
> >
> >
> > @Override
> >
> >
> > public Tuple2<String, Long> map(Tuple2<String, Long> value) {
> >         long
> > newSum = sum.value() + value.f1;
> >
> >         sum.update(newSum);
> >
> >
> > return new Tuple2<>(value.f0, newSum);
> >
> >
> > }
> >
> >
> >
> > @Override
> >
> >
> > public void open(Configuration config) {
> >
> >
> > counter = getRuntimeContext().getKeyValueState("myCounter", Long.class,
> 0L);
> >
> >
> > }
> > }
> >
> >
> > In Scala, you can write this briefly as:
> >
> > val stream: DataStream[(String, Int)] = ...
> >
> >
> >
> > val counts: DataStream[(String, Int)] = stream
> >
> >
> > .keyBy(_._1)
> >
> >
> > .mapWithState((in: (String, Int), sum: Option[Int])
> > => {
> >
> >     val newSum = in._2 + sum.getOrElse(0)
> >
> >     ( (
> > in._1, newSum), Some(newSum) )
> >  }
> >
> > Does that help?
> >
> > Thanks also for pointing out the error in the sample code...
> >
> > Greetings,
> > Stephan
> >
> >
> > On Wed, Nov 25, 2015 at 4:55 PM, Lopez, Javier <javier.lo...@zalando.de>
> wrote:
> > Hi,
> >
> > We are trying to do a test using States but we have not been able to
> achieve our desired result. Basically we have a data stream with data as
> [{"id":"11","value":123}] and we want to calculate the sum of all values
> grouping by ID. We were able to achieve this using windows but not with
> states. The example that is in the documentation (
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#working-with-state)
> is not very clear and even has some errors, for example:
> >
> > public class CounterSum implements RichReduceFunction<Long>
> > should be
> > public class CounterSum extends RichReduceFunction<Long>
> > as RichReduceFuncion is a Class, not an interface.
> >
> > We wanted to ask you if you have an example of how to use States with
> Flink.
> >
> > Thanks in advance for your help.
> >
> >
> >
> >
> >
>
>

Reply via email to