Hi, yes, unfortunately, there is a bug in the timestamp extraction operator that sets the “last-seen watermark” to Long.MIN_VALUE even though it should not when calling getCurrentWatermark().
I’m opening an Issue and adding a fix to the latest master and the branch for the 0.10.x bugfix releases. Sorry for the inconvenience. Cheers, Aljoscha > On 16 Nov 2015, at 18:13, Konstantin Knauf <konstantin.kn...@tngtech.com> > wrote: > > Hi Aljoscha, > > I changed the Timestamp Extraktor to save the lastSeenTimestamp and only > emit with getCurrentWatermark [1] as you suggested. So basically I do > the opposite than before (only watermarks per events vs only watermarks > per autowatermark). And now it works :). The question remains, why it > did not work before. As far as I see, it is an issue with the first > TimestmapExtractor itself?! > > Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks? > > Cheers, > > Konstantin > > [1] > > final private long maxDelay; > private long lastTimestamp = Long.MIN_VALUE; > > public PojoTimestampExtractor(long maxDelay) { > this.maxDelay = maxDelay; > } > > @Override > public long extractTimestamp(Pojo pojo, long l) { > lastTimestamp = pojo.getTime(); > return pojo.getTime(); > } > > @Override > public long extractWatermark(Pojo pojo, long l) { > return Long.MIN_VALUE; > } > > @Override > public long getCurrentWatermark() { > return lastTimestamp - maxDelay; > } > > > On 16.11.2015 13:37, Aljoscha Krettek wrote: >> Hi, >> yes, at your data-rate emitting a watermark for every element should not be >> a problem. It could become a problem with higher data-rates since the system >> can get overwhelmed if every element also generates a watermark. In that >> case I would suggest storing the lastest element-timestamp in an internal >> field and only emitting in getCurrentWatermark(), since then, then the >> watermark interval can be tunes using the auto-watermark interval setting. >> >> But that should not be the cause of the problem that you currently have. >> Would you maybe be willing to send me some (mock) example data and the code >> so that I can reproduce the problem and have a look at it? to aljoscha at >> apache.org. >> >> Cheers, >> Aljoscha >>> On 16 Nov 2015, at 13:05, Konstantin Knauf <konstantin.kn...@tngtech.com> >>> wrote: >>> >>> Hi Aljoscha, >>> >>> ok, now I at least understand, why it works with fromElements(...). For >>> the rest I am not so sure. >>> >>>> What this means in your case is that the watermark can only advance if >>> a new element arrives, because only then is the watermark updated. >>> >>> But new elements arrive all the time, about 50/s, or do you mean >>> something else? >>> >>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok >>> choice, if i understand the semantics correctly. It just affects >>> watermarking in the absence of events, right? >>> >>> Cheers, >>> >>> Konstantin >>> >>> >>> On 16.11.2015 12:31, Aljoscha Krettek wrote: >>>> Hi, >>>> it could be what Gyula mentioned. Let me first go a bit into how the >>>> TimestampExtractor works internally. >>>> >>>> First, the timestamp extractor internally keeps the value of the last >>>> emitted watermark. Then, the semantics of the TimestampExtractor are as >>>> follows : >>>> - the result of extractTimestamp is taken and it replaces the internal >>>> timestamp of the element >>>> - if the result of extractWatermark is larger than the last watermark the >>>> new value is emitted as a watermark and the value is stored >>>> - getCurrentWatermark is called on the specified auto-watermark interval, >>>> if the returned value is larger than the last watermark it is emitted and >>>> stored as last watermark >>>> >>>> What this means in your case is that the watermark can only advance if a >>>> new element arrives, because only then is the watermark updated. >>>> >>>> The reason why you see results if you use fromElements is that the >>>> window-operator also emits all the windows that it currently has buffered >>>> if the program closes. This happens in the case of fromElements because >>>> only a finite number of elements is emitted, after which the source >>>> closes, thereby finishing the whole program. >>>> >>>> Cheers, >>>> Aljoscha >>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.f...@gmail.com> wrote: >>>>> >>>>> Could this part of the extractor be the problem Aljoscha? >>>>> >>>>> @Override >>>>> public long getCurrentWatermark() { >>>>> return Long.MIN_VALUE; >>>>> } >>>>> >>>>> Gyula >>>>> >>>>> Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta (időpont: 2015. >>>>> nov. 16., H, 10:39): >>>>> Hi Aljoscha, >>>>> >>>>> thanks for your answer. Yes I am using the same TimestampExtractor-Class. >>>>> >>>>> The timestamps look good to me. Here an example. >>>>> >>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00 >>>>> >>>>> The order now is >>>>> >>>>> stream >>>>> .map(dummyMapper) >>>>> .assignTimestamps(...) >>>>> .timeWindow(...) >>>>> >>>>> Is there a way to print out the assigned timestamps after >>>>> stream.assignTimestamps(...)? >>>>> >>>>> Cheers, >>>>> >>>>> Konstantin >>>>> >>>>> >>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote: >>>>>> Hi, >>>>>> are you also using the timestamp extractor when you are using >>>>>> env.fromCollection(). >>>>>> >>>>>> Could you maybe insert a dummy mapper after the Kafka source that just >>>>>> prints the element and forwards it? To see if the elements come with a >>>>>> good timestamp from Kafka. >>>>>> >>>>>> Cheers, >>>>>> Aljoscha >>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf >>>>>>> <konstantin.kn...@tngtech.com> wrote: >>>>>>> >>>>>>> Hi everyone, >>>>>>> >>>>>>> I have the following issue with Flink (0.10) and Kafka. >>>>>>> >>>>>>> I am using a very simple TimestampExtractor like [1], which just >>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in >>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this: >>>>>>> >>>>>>> stream = env.addSource(new FlinkKafkaConsumer082< >>>>>>> (parameterTool.getRequired("topic"), >>>>>>> new AvroPojoDeserializationSchema(), >>>>>>> parameterTool.getProperties())) >>>>>>> >>>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime, >>>>>>> AutoWatermarkIntervall is 500. >>>>>>> >>>>>>> The problem is, when I do something like: >>>>>>> >>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000)) >>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS) >>>>>>> .sum(..) >>>>>>> .print() >>>>>>> >>>>>>> env.execute(); >>>>>>> >>>>>>> the windows never get triggered. >>>>>>> >>>>>>> If I use ProcessingTime it works. >>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works >>>>>>> with EventTime, too. >>>>>>> >>>>>>> Any ideas what I could be doing wrong are highly appreciated. >>>>>>> >>>>>>> Cheers, >>>>>>> >>>>>>> Konstantin >>>>>>> >>>>>>> [1]: >>>>>>> >>>>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> >>>>>>> { >>>>>>> >>>>>>> final private long maxDelay; >>>>>>> >>>>>>> public PojoTimestampExtractor(long maxDelay) { >>>>>>> this.maxDelay = maxDelay; >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public long extractTimestamp(Pojo fightEvent, long l) { >>>>>>> return pojo.getTime(); >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public long extractWatermark(Pojo pojo, long l) { >>>>>>> return pojo.getTime() - maxDelay; >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public long getCurrentWatermark() { >>>>>>> return Long.MIN_VALUE; >>>>>>> } >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 >>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>>> >>>> >>> >>> -- >>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 >>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >> >> > > -- > Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > Sitz: Unterföhring * Amtsgericht München * HRB 135082