Hi,
I'm trying to calculate the running average of session length and i want
to
trigger the computation on a regular let's say 2 minutes interval. I'm
trying to do it like this:
package flink;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import
org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.TimeZone;
public class StreamingJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<Event> sessions = env
.socketTextStream("localhost", 9000, "\n")
.map(new MapFunction<String, Event>() {
@Override
public Event map(String value) throws Exception {
String[] row = value.split(",");
return new Event(Long.valueOf(row[0]), row[1],
Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
}
})
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
@Override
public long extractTimestamp(Event element) {
return element.timestamp;
}
})
.keyBy("userId", "sessionId")
.maxBy("length");
sessions
.timeWindowAll(Time.seconds(60), Time.seconds(30))
.apply(new AllWindowFunction<Event, Avg, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<Event>
values, Collector<Avg> out) throws Exception {
long sum = 0;
int count = 0;
for (Event event : values) {
sum += event.length;
count++;
}
double avg = sum / count;
LocalDateTime windowStart =
LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()),
TimeZone.getDefault().toZoneId());
LocalDateTime windowEnd =
LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()),
TimeZone.getDefault().toZoneId());
out.collect(new Avg(avg, windowStart.toString(),
windowEnd.toString()));
}
});
env.execute();
}
@AllArgsConstructor
@NoArgsConstructor
@ToString
public static class Avg {
public double length;
public String windowStart;
public String windowEnd;
}
@AllArgsConstructor
@NoArgsConstructor
@ToString
public static class Event {
public long userId;
public String sessionId;
public long length;
public long timestamp;
}
}
First i want to extract the last session event for every user-session
because it contains the total session length. Then i want to calculate
the
average session length based on the data from
previous operation (based on the sessions variable).
Example:
1,s1,100,2017-12-13 11:58:01
1,s1,150,2017-12-13 11:58:02
1,s1,160,2017-12-13 11:58:03
2,s1,100,2017-12-13 11:58:04
sessions variable should contain those rows:
1,s1,160,2017-12-13 11:58:03
2,s1,100,2017-12-13 11:58:04
but it's returning the max length row only for the corresponding event.
Questions:
- how to collect the data for all groups in sessions variable?
- is there another way to achieve this functionality because with my
implementation the average will be computed on single node because
sessions
is of type SingleOutputStreamOperator<Event>
- can i use ContinuousEventTimeTrigger to trigger at regular intervals ?
Thanks