Hi robert,

Yes, I am using the same code, just swithcing the version in pom.xml to
1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at
the time of the question)). Here is the watermark assignment :

.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
    @Override
        public long extractAscendingTimestamp(Tuple3<Long,String,String>
tuple3) {
            return tuple3.f0;
        }
})

Best,
Yassine

2016-12-05 11:24 GMT+01:00 Robert Metzger <rmetz...@apache.org>:

> Hi Yassine,
> are you sure your watermark extractor is the same between the two
> versions. It sounds a bit like the watermarks for the 1.2 code are not
> generated correctly.
>
> Regards,
> Robert
>
>
> On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <
> y.marzou...@mindlytix.com> wrote:
>
>> Hi all,
>>
>> With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows
>> boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing
>> in memory and the windows results are not emitted until the whole stream is
>> processed. Is this a temporary behaviour due to the developments in
>> 1.2-SNAPSHOT, or a bug?
>>
>> I am using a code similar to the follwoing:
>>
>> env.setParallelism(1);
>>
>> DataStream<T> sessions = env
>>     .readTextFile()
>>     .flatMap()
>>     .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
>>     .keyBy(1)
>>     .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
>>     .apply().setParallelism(32)
>>
>> sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
>> sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();
>>
>> Best,
>> Yassine
>>
>
>

Reply via email to