Hi Aljoscha, I changed the Timestamp Extraktor to save the lastSeenTimestamp and only emit with getCurrentWatermark [1] as you suggested. So basically I do the opposite than before (only watermarks per events vs only watermarks per autowatermark). And now it works :). The question remains, why it did not work before. As far as I see, it is an issue with the first TimestmapExtractor itself?!
Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks? Cheers, Konstantin [1] final private long maxDelay; private long lastTimestamp = Long.MIN_VALUE; public PojoTimestampExtractor(long maxDelay) { this.maxDelay = maxDelay; } @Override public long extractTimestamp(Pojo pojo, long l) { lastTimestamp = pojo.getTime(); return pojo.getTime(); } @Override public long extractWatermark(Pojo pojo, long l) { return Long.MIN_VALUE; } @Override public long getCurrentWatermark() { return lastTimestamp - maxDelay; } On 16.11.2015 13:37, Aljoscha Krettek wrote: > Hi, > yes, at your data-rate emitting a watermark for every element should not be a > problem. It could become a problem with higher data-rates since the system > can get overwhelmed if every element also generates a watermark. In that case > I would suggest storing the lastest element-timestamp in an internal field > and only emitting in getCurrentWatermark(), since then, then the watermark > interval can be tunes using the auto-watermark interval setting. > > But that should not be the cause of the problem that you currently have. > Would you maybe be willing to send me some (mock) example data and the code > so that I can reproduce the problem and have a look at it? to aljoscha at > apache.org. > > Cheers, > Aljoscha >> On 16 Nov 2015, at 13:05, Konstantin Knauf <konstantin.kn...@tngtech.com> >> wrote: >> >> Hi Aljoscha, >> >> ok, now I at least understand, why it works with fromElements(...). For >> the rest I am not so sure. >> >>> What this means in your case is that the watermark can only advance if >> a new element arrives, because only then is the watermark updated. >> >> But new elements arrive all the time, about 50/s, or do you mean >> something else? >> >> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok >> choice, if i understand the semantics correctly. It just affects >> watermarking in the absence of events, right? >> >> Cheers, >> >> Konstantin >> >> >> On 16.11.2015 12:31, Aljoscha Krettek wrote: >>> Hi, >>> it could be what Gyula mentioned. Let me first go a bit into how the >>> TimestampExtractor works internally. >>> >>> First, the timestamp extractor internally keeps the value of the last >>> emitted watermark. Then, the semantics of the TimestampExtractor are as >>> follows : >>> - the result of extractTimestamp is taken and it replaces the internal >>> timestamp of the element >>> - if the result of extractWatermark is larger than the last watermark the >>> new value is emitted as a watermark and the value is stored >>> - getCurrentWatermark is called on the specified auto-watermark interval, >>> if the returned value is larger than the last watermark it is emitted and >>> stored as last watermark >>> >>> What this means in your case is that the watermark can only advance if a >>> new element arrives, because only then is the watermark updated. >>> >>> The reason why you see results if you use fromElements is that the >>> window-operator also emits all the windows that it currently has buffered >>> if the program closes. This happens in the case of fromElements because >>> only a finite number of elements is emitted, after which the source closes, >>> thereby finishing the whole program. >>> >>> Cheers, >>> Aljoscha >>>> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.f...@gmail.com> wrote: >>>> >>>> Could this part of the extractor be the problem Aljoscha? >>>> >>>> @Override >>>> public long getCurrentWatermark() { >>>> return Long.MIN_VALUE; >>>> } >>>> >>>> Gyula >>>> >>>> Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta (időpont: 2015. >>>> nov. 16., H, 10:39): >>>> Hi Aljoscha, >>>> >>>> thanks for your answer. Yes I am using the same TimestampExtractor-Class. >>>> >>>> The timestamps look good to me. Here an example. >>>> >>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00 >>>> >>>> The order now is >>>> >>>> stream >>>> .map(dummyMapper) >>>> .assignTimestamps(...) >>>> .timeWindow(...) >>>> >>>> Is there a way to print out the assigned timestamps after >>>> stream.assignTimestamps(...)? >>>> >>>> Cheers, >>>> >>>> Konstantin >>>> >>>> >>>> On 16.11.2015 10:31, Aljoscha Krettek wrote: >>>>> Hi, >>>>> are you also using the timestamp extractor when you are using >>>>> env.fromCollection(). >>>>> >>>>> Could you maybe insert a dummy mapper after the Kafka source that just >>>>> prints the element and forwards it? To see if the elements come with a >>>>> good timestamp from Kafka. >>>>> >>>>> Cheers, >>>>> Aljoscha >>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf >>>>>> <konstantin.kn...@tngtech.com> wrote: >>>>>> >>>>>> Hi everyone, >>>>>> >>>>>> I have the following issue with Flink (0.10) and Kafka. >>>>>> >>>>>> I am using a very simple TimestampExtractor like [1], which just >>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in >>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this: >>>>>> >>>>>> stream = env.addSource(new FlinkKafkaConsumer082< >>>>>> (parameterTool.getRequired("topic"), >>>>>> new AvroPojoDeserializationSchema(), >>>>>> parameterTool.getProperties())) >>>>>> >>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime, >>>>>> AutoWatermarkIntervall is 500. >>>>>> >>>>>> The problem is, when I do something like: >>>>>> >>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000)) >>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS) >>>>>> .sum(..) >>>>>> .print() >>>>>> >>>>>> env.execute(); >>>>>> >>>>>> the windows never get triggered. >>>>>> >>>>>> If I use ProcessingTime it works. >>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works >>>>>> with EventTime, too. >>>>>> >>>>>> Any ideas what I could be doing wrong are highly appreciated. >>>>>> >>>>>> Cheers, >>>>>> >>>>>> Konstantin >>>>>> >>>>>> [1]: >>>>>> >>>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> { >>>>>> >>>>>> final private long maxDelay; >>>>>> >>>>>> public PojoTimestampExtractor(long maxDelay) { >>>>>> this.maxDelay = maxDelay; >>>>>> } >>>>>> >>>>>> @Override >>>>>> public long extractTimestamp(Pojo fightEvent, long l) { >>>>>> return pojo.getTime(); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public long extractWatermark(Pojo pojo, long l) { >>>>>> return pojo.getTime() - maxDelay; >>>>>> } >>>>>> >>>>>> @Override >>>>>> public long getCurrentWatermark() { >>>>>> return Long.MIN_VALUE; >>>>>> } >>>>> >>>>> >>>> >>>> -- >>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 >>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>> >>> >> >> -- >> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >> Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > -- Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082