Hi, are you also using the timestamp extractor when you are using env.fromCollection().
Could you maybe insert a dummy mapper after the Kafka source that just prints the element and forwards it? To see if the elements come with a good timestamp from Kafka. Cheers, Aljoscha > On 15 Nov 2015, at 22:55, Konstantin Knauf <konstantin.kn...@tngtech.com> > wrote: > > Hi everyone, > > I have the following issue with Flink (0.10) and Kafka. > > I am using a very simple TimestampExtractor like [1], which just > extracts a millis timestamp from a POJO. In my streaming job, I read in > these POJOs from Kafka using the FlinkKafkaConsumer082 like this: > > stream = env.addSource(new FlinkKafkaConsumer082< > (parameterTool.getRequired("topic"), > new AvroPojoDeserializationSchema(), > parameterTool.getProperties())) > > I have timestampEnabled() and the TimeCharacteristics are EventTime, > AutoWatermarkIntervall is 500. > > The problem is, when I do something like: > > stream.assignTimestamps(new PojoTimestampExtractor(6000)) > .timeWindowAll(Time.of(1, TimeUnit.SECONDS) > .sum(..) > .print() > > env.execute(); > > the windows never get triggered. > > If I use ProcessingTime it works. > If I use env.fromCollection(...) instead of the KafkaSource it works > with EventTime, too. > > Any ideas what I could be doing wrong are highly appreciated. > > Cheers, > > Konstantin > > [1]: > > public class PojoTimestampExtractor implements TimestampExtractor<Pojo> { > > final private long maxDelay; > > public PojoTimestampExtractor(long maxDelay) { > this.maxDelay = maxDelay; > } > > @Override > public long extractTimestamp(Pojo fightEvent, long l) { > return pojo.getTime(); > } > > @Override > public long extractWatermark(Pojo pojo, long l) { > return pojo.getTime() - maxDelay; > } > > @Override > public long getCurrentWatermark() { > return Long.MIN_VALUE; > }