Could this part of the extractor be the problem Aljoscha? @Override public long getCurrentWatermark() { return Long.MIN_VALUE; }
Gyula Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta (időpont: 2015. nov. 16., H, 10:39): > Hi Aljoscha, > > thanks for your answer. Yes I am using the same TimestampExtractor-Class. > > The timestamps look good to me. Here an example. > > {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00 > > The order now is > > stream > .map(dummyMapper) > .assignTimestamps(...) > .timeWindow(...) > > Is there a way to print out the assigned timestamps after > stream.assignTimestamps(...)? > > Cheers, > > Konstantin > > > On 16.11.2015 10:31, Aljoscha Krettek wrote: > > Hi, > > are you also using the timestamp extractor when you are using > env.fromCollection(). > > > > Could you maybe insert a dummy mapper after the Kafka source that just > prints the element and forwards it? To see if the elements come with a good > timestamp from Kafka. > > > > Cheers, > > Aljoscha > >> On 15 Nov 2015, at 22:55, Konstantin Knauf < > konstantin.kn...@tngtech.com> wrote: > >> > >> Hi everyone, > >> > >> I have the following issue with Flink (0.10) and Kafka. > >> > >> I am using a very simple TimestampExtractor like [1], which just > >> extracts a millis timestamp from a POJO. In my streaming job, I read in > >> these POJOs from Kafka using the FlinkKafkaConsumer082 like this: > >> > >> stream = env.addSource(new FlinkKafkaConsumer082< > >> (parameterTool.getRequired("topic"), > >> new AvroPojoDeserializationSchema(), > >> parameterTool.getProperties())) > >> > >> I have timestampEnabled() and the TimeCharacteristics are EventTime, > >> AutoWatermarkIntervall is 500. > >> > >> The problem is, when I do something like: > >> > >> stream.assignTimestamps(new PojoTimestampExtractor(6000)) > >> .timeWindowAll(Time.of(1, TimeUnit.SECONDS) > >> .sum(..) > >> .print() > >> > >> env.execute(); > >> > >> the windows never get triggered. > >> > >> If I use ProcessingTime it works. > >> If I use env.fromCollection(...) instead of the KafkaSource it works > >> with EventTime, too. > >> > >> Any ideas what I could be doing wrong are highly appreciated. > >> > >> Cheers, > >> > >> Konstantin > >> > >> [1]: > >> > >> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> > { > >> > >> final private long maxDelay; > >> > >> public PojoTimestampExtractor(long maxDelay) { > >> this.maxDelay = maxDelay; > >> } > >> > >> @Override > >> public long extractTimestamp(Pojo fightEvent, long l) { > >> return pojo.getTime(); > >> } > >> > >> @Override > >> public long extractWatermark(Pojo pojo, long l) { > >> return pojo.getTime() - maxDelay; > >> } > >> > >> @Override > >> public long getCurrentWatermark() { > >> return Long.MIN_VALUE; > >> } > > > > > > -- > Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > Sitz: Unterföhring * Amtsgericht München * HRB 135082 >