I think it will not solve the problem as if i set
ContinuousEventTimeTrigger to 10 seconds and
allowedLateness(Time.seconds(60)) as i don't want to discard events from
different users received later then i might receive more than one row
for a single user based on the number of windows created by the events
of this user. That will make the the average computations wrong.
On 22.12.2017 12:10, Piotr Nowojski wrote:
Ok, I think now I understand your problem.
Wouldn’t it be enough, if you change last global window to something
like this:
lastUserSession
.timeWindowAll(*Time.seconds(10)*)
.aggregate(new AverageSessionLengthAcrossAllUsers())
.print();
(As a side note, maybe you should use ContinousEventTimeTrigger in the
first window). This way it will aggregate and calculate average
session length of only last “preview results” of the 60 seconds user
windows (emitted every 10 seconds from the first aggregation).
Piotrek
On 21 Dec 2017, at 15:18, Plamen Paskov
<plamen.pas...@next-stream.com
<mailto:plamen.pas...@next-stream.com>> wrote:
Imagine a case where i want to run a computation every X seconds for
1 day window. I want the calculate average session length for current
day every X seconds. Is there an easy way to achieve that?
On 21.12.2017 16:06, Piotr Nowojski wrote:
Hi,
You defined a tumbling window
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows)
of 60 seconds, triggered every 10 seconds. This means that each
input element can be processed/averaged up to 6 times (there is no
other way if you trigger each window multiple times).
I am not sure what are you trying to achieve, but please refer to
the documentation about different window types (tumbling, sliding,
session) maybe it will clarify things for you:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners
If you want to avoid duplicated processing, use either tumbling
window with default trigger (triggering at the end of the window),
or use session windows.
Piotrek
On 21 Dec 2017, at 13:29, Plamen Paskov
<plamen.pas...@next-stream.com
<mailto:plamen.pas...@next-stream.com>> wrote:
Hi guys,
I have the following code:
SingleOutputStreamOperator<Event> lastUserSession = env
.socketTextStream("localhost",9000,"\n")
.map(new MapFunction<String, Event>() {
@Override public Event map(String value)throws Exception {
String[] row = value.split(",");
return new Event(Long.valueOf(row[0]), row[1],
Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
}
})
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
@Override public long extractTimestamp(Event element) {
return element.timestamp;
}
})
.keyBy("userId","sessionId")
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
.maxBy("length",false);
lastUserSession
.timeWindowAll(Time.seconds(60))
.aggregate(new AverageSessionLengthAcrossAllUsers())
.print();
What i'm trying to achieve is to calculate the average session length every 10
seconds. The problem is that once the window length is 60 seconds and a
computation is triggered
every 10 seconds i will receive duplicate events in my average calculation
method so the average will not be correct. If i move
ContinuousProcessingTimeTrigger down before
AverageSessionLengthAcrossAllUsers() then it's not triggering every 10 seconds.
Any other suggestions how to workaround this?
Thanks