Re: consecutive stream aggregations

2017-12-18 Thread Ufuk Celebi
You can do this by first doing a keyBy userId and then emitting the value you want to average (session length). The output of this you feed into the aggregateFunction that does a grouping by time and emits the average per time. input.keyBy(user).flatMap(extractSessionLength()).timeWindowAll(time).

Re: consecutive stream aggregations

2017-12-15 Thread Plamen Paskov
In my case i have a lot of users with one session per user. What i'm thinking is to evenly distribute the users then accumulate and finally merge all accumulators. The problem is that i don't know how to achieve this. On 15.12.2017 17:52, Ufuk Celebi wrote: You can first aggregate the length

Re: consecutive stream aggregations

2017-12-15 Thread Ufuk Celebi
You can first aggregate the length per user and emit it downstream. Then you do the all window and average all lengths. Does that make sense? On Fri, Dec 15, 2017 at 4:48 PM, Plamen Paskov wrote: > I think i got your point. > What happens now: in order to use aggregate() i need an window but the

Re: consecutive stream aggregations

2017-12-15 Thread Ufuk Celebi
You have to specify a window for this to work: stream .keyBy() .timeWindow() .aggregate() On Fri, Dec 15, 2017 at 3:04 PM, Plamen Paskov wrote: > Hi Ufuk, > > Thanks for answer. It looks like in theory the accumulators are the solution > to my problem but as i'm working on KeyedStream it

Re: consecutive stream aggregations

2017-12-15 Thread Plamen Paskov
I think i got your point. What happens now: in order to use aggregate() i need an window but the window requires keyBy() if i want to parallelize the data. In my case it will not work because if i create keyBy("userId") then the average will be calculated per userId  but i want average across al

Re: consecutive stream aggregations

2017-12-15 Thread Plamen Paskov
Hi Ufuk, Thanks for answer. It looks like in theory the accumulators are the solution to my problem but as i'm working on KeyedStream it's not possible to call aggregate with AggregateFunction implementation. Am i missing something? On 15.12.2017 15:46, Ufuk Celebi wrote: Hey Plamen, I th

Re: consecutive stream aggregations

2017-12-15 Thread Ufuk Celebi
Hey Plamen, I think what you are looking for is the AggregateFunction. This you can use on keyed streams. The Javadoc [1] contains an example for your use case (averaging). – Ufuk [1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/Aggr

consecutive stream aggregations

2017-12-15 Thread Plamen Paskov
Hi, I'm trying to calculate the running average of session length and i want to trigger the computation on a regular let's say 2 minutes interval. I'm trying to do it like this: package flink; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import lombok.ToString; import o