Hi,

I'm trying to calculate the running average of session length and i want to trigger the computation on a regular let's say 2 minutes interval. I'm trying to do it like this:

package flink;

import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.TimeZone;


public class StreamingJob {
    public static void main(String[] args)throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        SingleOutputStreamOperator<Event> sessions = env
                .socketTextStream("localhost",9000,"\n")
                .map(new MapFunction<String, Event>() {
                    @Override public Event map(String value)throws Exception {
                        String[] row = value.split(",");
                        return new Event(Long.valueOf(row[0]), row[1], 
Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
                    }
                })
                .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
                    @Override public long extractTimestamp(Event element) {
                        return element.timestamp;
                    }
                })
                .keyBy("userId","sessionId")
                .maxBy("length");


        sessions
                .timeWindowAll(Time.seconds(60), Time.seconds(30))
                .apply(new AllWindowFunction<Event, Avg, TimeWindow>() {
                    @Override public void apply(TimeWindow window, Iterable<Event> 
values, Collector<Avg> out)throws Exception {
                        long sum =0;
                        int count =0;

                        for (Event event : values) {
                            sum += event.length;
                            count++;
                        }

                        double avg = sum / count;
                        LocalDateTime windowStart = 
LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()), 
TimeZone.getDefault().toZoneId());
                        LocalDateTime windowEnd = 
LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()), 
TimeZone.getDefault().toZoneId());
                        out.collect(new Avg(avg, windowStart.toString(), 
windowEnd.toString()));
                    }
                });

        env.execute();
    }

    @AllArgsConstructor @NoArgsConstructor @ToString public static class Avg {
        public double length;
        public StringwindowStart;
        public StringwindowEnd;
    }

    @AllArgsConstructor @NoArgsConstructor @ToString public static class Event {
        public long userId;
        public StringsessionId;
        public long length;
        public long timestamp;
    }
}

First i want to extract the last session event for every user-session because 
it contains the total session length. Then i want to calculate the average 
session length based on the data from
previous operation (based on the sessions variable).

Example:

1,s1,100,2017-12-13 11:58:01
1,s1,150,2017-12-13 11:58:02
1,s1,160,2017-12-13 11:58:03
2,s1,100,2017-12-13 11:58:04

sessions variable should contain those rows:
1,s1,160,2017-12-13 11:58:03
2,s1,100,2017-12-13 11:58:04

but it's returning the max length row only for the corresponding event.

Questions:
- how to collect the data for all groups in sessions variable?
- is there another way to achieve this functionality because with my implementation 
the average will be computed on single node because sessions is of type 
SingleOutputStreamOperator<Event>
- can i use ContinuousEventTimeTrigger to trigger at regular intervals ?

Thanks

Reply via email to