Hi,
could you please try adding this custom watermark debugger to see what's
going on with the element timestamps and watermarks:

public static class WatermarkDebugger<T>
        extends AbstractStreamOperator<T> implements
OneInputStreamOperator<T, T> {
    private static final long serialVersionUID = 1L;

    @Override
    public void processElement(StreamRecord<T> element) throws Exception {
        System.out.println("ELEMENT: " + element);
        output.collect(element);
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        System.out.println("WM: " + mark);
    }
}

you can use it like this:
input.transform("WatermarkDebugger", input.getType(), new
WatermarkDebugger<Tuple2<String, Integer>>());

That should give us something to work with.

Cheers,
Aljoscha

On Mon, 5 Dec 2016 at 18:54 Robert Metzger <rmetz...@apache.org> wrote:

I'll add Aljoscha and Kostas Kloudas to the conversation. They have the
best overview over the changes to the window operator between 1.1. and 1.2.

On Mon, Dec 5, 2016 at 11:33 AM, Yassine MARZOUGUI <
y.marzou...@mindlytix.com> wrote:

I forgot to mention : the watermark extractor is the one included in Flink
API.

2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:

Hi robert,

Yes, I am using the same code, just swithcing the version in pom.xml to
1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at
the time of the question)). Here is the watermark assignment :

.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
    @Override
        public long extractAscendingTimestamp(Tuple3<Long,String,String>
tuple3) {
            return tuple3.f0;
        }
})

Best,
Yassine

2016-12-05 11:24 GMT+01:00 Robert Metzger <rmetz...@apache.org>:

Hi Yassine,
are you sure your watermark extractor is the same between the two versions.
It sounds a bit like the watermarks for the 1.2 code are not generated
correctly.

Regards,
Robert


On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <y.marzou...@mindlytix.com
> wrote:

Hi all,

With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows
boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing
in memory and the windows results are not emitted until the whole stream is
processed. Is this a temporary behaviour due to the developments in
1.2-SNAPSHOT, or a bug?

I am using a code similar to the follwoing:

env.setParallelism(1);

DataStream<T> sessions = env
    .readTextFile()
    .flatMap()
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
    .keyBy(1)
    .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
    .apply().setParallelism(32)

sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();

Best,
Yassine

Reply via email to