Hi,
could you please try adding this custom watermark debugger to see what's
going on with the element timestamps and watermarks:
public static class WatermarkDebugger<T>
extends AbstractStreamOperator<T> implements
OneInputStreamOperator<T, T> {
private static final long serialVersionUID = 1L;
@Override
public void processElement(StreamRecord<T> element) throws Exception {
System.out.println("ELEMENT: " + element);
output.collect(element);
}
@Override
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
System.out.println("WM: " + mark);
}
}
you can use it like this:
input.transform("WatermarkDebugger", input.getType(), new
WatermarkDebugger<Tuple2<String, Integer>>());
That should give us something to work with.
Cheers,
Aljoscha
On Mon, 5 Dec 2016 at 18:54 Robert Metzger <[email protected]> wrote:
I'll add Aljoscha and Kostas Kloudas to the conversation. They have the
best overview over the changes to the window operator between 1.1. and 1.2.
On Mon, Dec 5, 2016 at 11:33 AM, Yassine MARZOUGUI <
[email protected]> wrote:
I forgot to mention : the watermark extractor is the one included in Flink
API.
2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI <[email protected]>:
Hi robert,
Yes, I am using the same code, just swithcing the version in pom.xml to
1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at
the time of the question)). Here is the watermark assignment :
.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
@Override
public long extractAscendingTimestamp(Tuple3<Long,String,String>
tuple3) {
return tuple3.f0;
}
})
Best,
Yassine
2016-12-05 11:24 GMT+01:00 Robert Metzger <[email protected]>:
Hi Yassine,
are you sure your watermark extractor is the same between the two versions.
It sounds a bit like the watermarks for the 1.2 code are not generated
correctly.
Regards,
Robert
On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <[email protected]
> wrote:
Hi all,
With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows
boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing
in memory and the windows results are not emitted until the whole stream is
processed. Is this a temporary behaviour due to the developments in
1.2-SNAPSHOT, or a bug?
I am using a code similar to the follwoing:
env.setParallelism(1);
DataStream<T> sessions = env
.readTextFile()
.flatMap()
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
.keyBy(1)
.window(EventTimeSessionWindows.withGap(Time.minutes(5)))
.apply().setParallelism(32)
sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();
Best,
Yassine