Hi Ufuk,

Thanks for answer. It looks like in theory the accumulators are the solution to my problem but as i'm working on KeyedStream it's not possible to call aggregate with AggregateFunction implementation. Am i missing something?


On 15.12.2017 15:46, Ufuk Celebi wrote:
Hey Plamen,

I think what you are looking for is the AggregateFunction. This you
can use on keyed streams. The Javadoc [1] contains an example for your
use case (averaging).

– Ufuk

[1] 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java

On Fri, Dec 15, 2017 at 11:55 AM, Plamen Paskov
<plamen.pas...@next-stream.com> wrote:
Hi,

I'm trying to calculate the running average of session length and i want to
trigger the computation on a regular let's say 2 minutes interval. I'm
trying to do it like this:

package flink;

import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.TimeZone;


public class StreamingJob {
     public static void main(String[] args) throws Exception {
         StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

         SingleOutputStreamOperator<Event> sessions = env
                 .socketTextStream("localhost", 9000, "\n")
                 .map(new MapFunction<String, Event>() {
                     @Override
                     public Event map(String value) throws Exception {
                         String[] row = value.split(",");
                         return new Event(Long.valueOf(row[0]), row[1],
Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
                     }
                 })
                 .assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
                     @Override
                     public long extractTimestamp(Event element) {
                         return element.timestamp;
                     }
                 })
                 .keyBy("userId", "sessionId")
                 .maxBy("length");


         sessions
                 .timeWindowAll(Time.seconds(60), Time.seconds(30))
                 .apply(new AllWindowFunction<Event, Avg, TimeWindow>() {
                     @Override
                     public void apply(TimeWindow window, Iterable<Event>
values, Collector<Avg> out) throws Exception {
                         long sum = 0;
                         int count = 0;

                         for (Event event : values) {
                             sum += event.length;
                             count++;
                         }

                         double avg = sum / count;
                         LocalDateTime windowStart =
LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()),
TimeZone.getDefault().toZoneId());
                         LocalDateTime windowEnd =
LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()),
TimeZone.getDefault().toZoneId());
                         out.collect(new Avg(avg, windowStart.toString(),
windowEnd.toString()));
                     }
                 });

         env.execute();
     }

     @AllArgsConstructor
     @NoArgsConstructor
     @ToString
     public static class Avg {
         public double length;
         public String windowStart;
         public String windowEnd;
     }

     @AllArgsConstructor
     @NoArgsConstructor
     @ToString
     public static class Event {
         public long userId;
         public String sessionId;
         public long length;
         public long timestamp;
     }
}

First i want to extract the last session event for every user-session
because it contains the total session length. Then i want to calculate the
average session length based on the data from
previous operation (based on the sessions variable).

Example:

1,s1,100,2017-12-13 11:58:01
1,s1,150,2017-12-13 11:58:02
1,s1,160,2017-12-13 11:58:03
2,s1,100,2017-12-13 11:58:04

sessions variable should contain those rows:
1,s1,160,2017-12-13 11:58:03
2,s1,100,2017-12-13 11:58:04

but it's returning the max length row only for the corresponding event.

Questions:
- how to collect the data for all groups in sessions variable?
- is there another way to achieve this functionality because with my
implementation the average will be computed on single node because sessions
is of type SingleOutputStreamOperator<Event>
- can i use ContinuousEventTimeTrigger to trigger at regular intervals ?

Thanks

Reply via email to