Hi Aljoscha, Are you sure? I am running the job from my IDE at the moment.
If I set StreamExecutionEnvironment.setParallelism(1); I works with the old TimestampExtractor (returning Long.MIN_VALUE from getCurrentWatermark() and emitting a watermark at every record) If I set StreamExecutionEnvironment.setParallelism(5); it does not work. So, if I understood you correctly, it is the opposite of what you were expecting?! Cheers, Konstantin On 17.11.2015 11:32, Aljoscha Krettek wrote: > Hi, > actually, the bug is more subtle. Normally, it is not a problem that the > TimestampExtractor sometimes emits a watermark that is lower than the one > before. (This is the result of the bug with Long.MIN_VALUE I mentioned > before). The stream operators wait for watermarks from all upstream operators > and only advance the watermark monotonically in lockstep with them. This way, > the watermark cannot decrease at an operator. > > In your case, you have a topology with parallelism 1, I assume. In that case > the operators are chained. (There is no separate operators but basically only > one operator and element transmission happens in function calls). In this > setting the watermarks are directly forwarded to operators without going > through the logic I mentioned above. > > Cheers, > Aljoscha >> On 16 Nov 2015, at 18:13, Konstantin Knauf <konstantin.kn...@tngtech.com> >> wrote: >> >> Hi Aljoscha, >> >> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only >> emit with getCurrentWatermark [1] as you suggested. So basically I do >> the opposite than before (only watermarks per events vs only watermarks >> per autowatermark). And now it works :). The question remains, why it >> did not work before. As far as I see, it is an issue with the first >> TimestmapExtractor itself?! >> >> Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks? >> >> Cheers, >> >> Konstantin >> >> [1] >> >> final private long maxDelay; >> private long lastTimestamp = Long.MIN_VALUE; >> >> public PojoTimestampExtractor(long maxDelay) { >> this.maxDelay = maxDelay; >> } >> >> @Override >> public long extractTimestamp(Pojo pojo, long l) { >> lastTimestamp = pojo.getTime(); >> return pojo.getTime(); >> } >> >> @Override >> public long extractWatermark(Pojo pojo, long l) { >> return Long.MIN_VALUE; >> } >> >> @Override >> public long getCurrentWatermark() { >> return lastTimestamp - maxDelay; >> } >> >> >> On 16.11.2015 13:37, Aljoscha Krettek wrote: >>> Hi, >>> yes, at your data-rate emitting a watermark for every element should not be >>> a problem. It could become a problem with higher data-rates since the >>> system can get overwhelmed if every element also generates a watermark. In >>> that case I would suggest storing the lastest element-timestamp in an >>> internal field and only emitting in getCurrentWatermark(), since then, then >>> the watermark interval can be tunes using the auto-watermark interval >>> setting. >>> >>> But that should not be the cause of the problem that you currently have. >>> Would you maybe be willing to send me some (mock) example data and the code >>> so that I can reproduce the problem and have a look at it? to aljoscha at >>> apache.org. >>> >>> Cheers, >>> Aljoscha >>>> On 16 Nov 2015, at 13:05, Konstantin Knauf <konstantin.kn...@tngtech.com> >>>> wrote: >>>> >>>> Hi Aljoscha, >>>> >>>> ok, now I at least understand, why it works with fromElements(...). For >>>> the rest I am not so sure. >>>> >>>>> What this means in your case is that the watermark can only advance if >>>> a new element arrives, because only then is the watermark updated. >>>> >>>> But new elements arrive all the time, about 50/s, or do you mean >>>> something else? >>>> >>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok >>>> choice, if i understand the semantics correctly. It just affects >>>> watermarking in the absence of events, right? >>>> >>>> Cheers, >>>> >>>> Konstantin >>>> >>>> >>>> On 16.11.2015 12:31, Aljoscha Krettek wrote: >>>>> Hi, >>>>> it could be what Gyula mentioned. Let me first go a bit into how the >>>>> TimestampExtractor works internally. >>>>> >>>>> First, the timestamp extractor internally keeps the value of the last >>>>> emitted watermark. Then, the semantics of the TimestampExtractor are as >>>>> follows : >>>>> - the result of extractTimestamp is taken and it replaces the internal >>>>> timestamp of the element >>>>> - if the result of extractWatermark is larger than the last watermark the >>>>> new value is emitted as a watermark and the value is stored >>>>> - getCurrentWatermark is called on the specified auto-watermark interval, >>>>> if the returned value is larger than the last watermark it is emitted and >>>>> stored as last watermark >>>>> >>>>> What this means in your case is that the watermark can only advance if a >>>>> new element arrives, because only then is the watermark updated. >>>>> >>>>> The reason why you see results if you use fromElements is that the >>>>> window-operator also emits all the windows that it currently has buffered >>>>> if the program closes. This happens in the case of fromElements because >>>>> only a finite number of elements is emitted, after which the source >>>>> closes, thereby finishing the whole program. >>>>> >>>>> Cheers, >>>>> Aljoscha >>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.f...@gmail.com> wrote: >>>>>> >>>>>> Could this part of the extractor be the problem Aljoscha? >>>>>> >>>>>> @Override >>>>>> public long getCurrentWatermark() { >>>>>> return Long.MIN_VALUE; >>>>>> } >>>>>> >>>>>> Gyula >>>>>> >>>>>> Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta (időpont: 2015. >>>>>> nov. 16., H, 10:39): >>>>>> Hi Aljoscha, >>>>>> >>>>>> thanks for your answer. Yes I am using the same TimestampExtractor-Class. >>>>>> >>>>>> The timestamps look good to me. Here an example. >>>>>> >>>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00 >>>>>> >>>>>> The order now is >>>>>> >>>>>> stream >>>>>> .map(dummyMapper) >>>>>> .assignTimestamps(...) >>>>>> .timeWindow(...) >>>>>> >>>>>> Is there a way to print out the assigned timestamps after >>>>>> stream.assignTimestamps(...)? >>>>>> >>>>>> Cheers, >>>>>> >>>>>> Konstantin >>>>>> >>>>>> >>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote: >>>>>>> Hi, >>>>>>> are you also using the timestamp extractor when you are using >>>>>>> env.fromCollection(). >>>>>>> >>>>>>> Could you maybe insert a dummy mapper after the Kafka source that just >>>>>>> prints the element and forwards it? To see if the elements come with a >>>>>>> good timestamp from Kafka. >>>>>>> >>>>>>> Cheers, >>>>>>> Aljoscha >>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf >>>>>>>> <konstantin.kn...@tngtech.com> wrote: >>>>>>>> >>>>>>>> Hi everyone, >>>>>>>> >>>>>>>> I have the following issue with Flink (0.10) and Kafka. >>>>>>>> >>>>>>>> I am using a very simple TimestampExtractor like [1], which just >>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in >>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this: >>>>>>>> >>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082< >>>>>>>> (parameterTool.getRequired("topic"), >>>>>>>> new AvroPojoDeserializationSchema(), >>>>>>>> parameterTool.getProperties())) >>>>>>>> >>>>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime, >>>>>>>> AutoWatermarkIntervall is 500. >>>>>>>> >>>>>>>> The problem is, when I do something like: >>>>>>>> >>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000)) >>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS) >>>>>>>> .sum(..) >>>>>>>> .print() >>>>>>>> >>>>>>>> env.execute(); >>>>>>>> >>>>>>>> the windows never get triggered. >>>>>>>> >>>>>>>> If I use ProcessingTime it works. >>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works >>>>>>>> with EventTime, too. >>>>>>>> >>>>>>>> Any ideas what I could be doing wrong are highly appreciated. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> >>>>>>>> Konstantin >>>>>>>> >>>>>>>> [1]: >>>>>>>> >>>>>>>> public class PojoTimestampExtractor implements >>>>>>>> TimestampExtractor<Pojo> { >>>>>>>> >>>>>>>> final private long maxDelay; >>>>>>>> >>>>>>>> public PojoTimestampExtractor(long maxDelay) { >>>>>>>> this.maxDelay = maxDelay; >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) { >>>>>>>> return pojo.getTime(); >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public long extractWatermark(Pojo pojo, long l) { >>>>>>>> return pojo.getTime() - maxDelay; >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public long getCurrentWatermark() { >>>>>>>> return Long.MIN_VALUE; >>>>>>>> } >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 >>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>>>> >>>>> >>>> >>>> -- >>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 >>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>> >>> >> >> -- >> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >> Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > -- Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082