Hi Aljoscha, thanks for your answer. Yes I am using the same TimestampExtractor-Class.
The timestamps look good to me. Here an example. {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00 The order now is stream .map(dummyMapper) .assignTimestamps(...) .timeWindow(...) Is there a way to print out the assigned timestamps after stream.assignTimestamps(...)? Cheers, Konstantin On 16.11.2015 10:31, Aljoscha Krettek wrote: > Hi, > are you also using the timestamp extractor when you are using > env.fromCollection(). > > Could you maybe insert a dummy mapper after the Kafka source that just prints > the element and forwards it? To see if the elements come with a good > timestamp from Kafka. > > Cheers, > Aljoscha >> On 15 Nov 2015, at 22:55, Konstantin Knauf <konstantin.kn...@tngtech.com> >> wrote: >> >> Hi everyone, >> >> I have the following issue with Flink (0.10) and Kafka. >> >> I am using a very simple TimestampExtractor like [1], which just >> extracts a millis timestamp from a POJO. In my streaming job, I read in >> these POJOs from Kafka using the FlinkKafkaConsumer082 like this: >> >> stream = env.addSource(new FlinkKafkaConsumer082< >> (parameterTool.getRequired("topic"), >> new AvroPojoDeserializationSchema(), >> parameterTool.getProperties())) >> >> I have timestampEnabled() and the TimeCharacteristics are EventTime, >> AutoWatermarkIntervall is 500. >> >> The problem is, when I do something like: >> >> stream.assignTimestamps(new PojoTimestampExtractor(6000)) >> .timeWindowAll(Time.of(1, TimeUnit.SECONDS) >> .sum(..) >> .print() >> >> env.execute(); >> >> the windows never get triggered. >> >> If I use ProcessingTime it works. >> If I use env.fromCollection(...) instead of the KafkaSource it works >> with EventTime, too. >> >> Any ideas what I could be doing wrong are highly appreciated. >> >> Cheers, >> >> Konstantin >> >> [1]: >> >> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> { >> >> final private long maxDelay; >> >> public PojoTimestampExtractor(long maxDelay) { >> this.maxDelay = maxDelay; >> } >> >> @Override >> public long extractTimestamp(Pojo fightEvent, long l) { >> return pojo.getTime(); >> } >> >> @Override >> public long extractWatermark(Pojo pojo, long l) { >> return pojo.getTime() - maxDelay; >> } >> >> @Override >> public long getCurrentWatermark() { >> return Long.MIN_VALUE; >> } > > -- Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082