A solution for that is in these two pull requests: https://github.com/apache/flink/pull/1447
https://github.com/apache/flink/pull/1448 On Fri, Dec 4, 2015 at 10:21 PM, Robert Metzger <rmetz...@apache.org> wrote: > I think we need to find a solution for this problem soon. > Another user is most likely affected: > http://stackoverflow.com/q/34090808/568695 > > I've filed a JIRA for the problem: > https://issues.apache.org/jira/browse/FLINK-3121 > > > On Mon, Nov 30, 2015 at 5:58 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Maybe. In the Kafka case we just need to ensure that parallel instances >> of the source that know that they don’t have any partitions assigned to >> them emit Long.MAX_VALUE as a watermark. >> >> > On 30 Nov 2015, at 17:50, Gyula Fóra <gyula.f...@gmail.com> wrote: >> > >> > Hi, >> > >> > I think what we will need at some point for this are approximate >> whatermarks which correlate event and ingest time. >> > >> > I think they have similar concepts in Millwheel/Dataflow. >> > >> > Cheers, >> > Gyula >> > On Mon, Nov 30, 2015 at 5:29 PM Aljoscha Krettek <aljos...@apache.org> >> wrote: >> > Hi, >> > as an addition. I don’t have a solution yet, for the general problem of >> what happens when a parallel instance of a source never receives elements. >> This watermark business is very tricky... >> > >> > Cheers, >> > Aljoscha >> > > On 30 Nov 2015, at 17:20, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> > > >> > > Hi Konstantin, >> > > I finally nailed down the problem. :-) >> > > >> > > The basis of the problem is the fact that there is a mismatch in the >> parallelism of the Flink Kafka Consumer and the number of partitions in the >> Kafka Stream. I would assume that in your case the Kafka Stream has 1 >> partition. This means, that only one of the parallel instances of the Flink >> Kafka Consumer ever receives element, which in turn means that only one of >> the parallel instances of the timestamp extractor ever receives elements. >> This means that no watermarks get emitted for the other parallel instances >> which in turn means that the watermark does not advance downstream because >> the watermark at an operator is the minimum over all upstream watermarks. >> This explains why ExampleTimestampExtractor1 only works in the case with >> parallelism=1. >> > > >> > > The reason why ExampleTimestampExtractor2 works in all parallelism >> settings is not very obvious. The secret is in this method: >> > > >> > > @Override >> > > public long getCurrentWatermark() { >> > > return lastTimestamp - maxDelay; >> > > } >> > > >> > > In the parallel instances that never receive any element >> lastTimestamp is set to Long.MIN_VALUE. So “lastTimestamp - maxDelay” is >> (Long.MAX_VALUE - maxDelay (+1)). Now, because the watermark at an operator >> is always the minimum over all watermarks from upstream operators the >> watermark at the window operator always tracks the watermark of the >> parallel instance that receives elements. >> > > >> > > I hope this helps, but please let me know if I should provide more >> explanation. This is a very tricky topic. >> > > >> > > Cheers, >> > > Aljoscha >> > > >> > >> On 29 Nov 2015, at 21:18, Konstantin Knauf < >> konstantin.kn...@tngtech.com> wrote: >> > >> >> > >> Hi Aljoscha, >> > >> >> > >> I have put together a gist [1] with two classes, a short processing >> > >> pipeline, which shows the behavior and a data generator to write >> records >> > >> into Kafka. I hope I remembered everything we discussed correctly. >> > >> >> > >> So basically in the example it works with "TimestampExtractor1" only >> for >> > >> parallelism 1, with "TimestampExtractor2" it works regardless of the >> > >> parallelism. Run from the IDE. >> > >> >> > >> Let me know if you need anything else. >> > >> >> > >> Cheers, >> > >> >> > >> Konstantin >> > >> >> > >> [1] https://gist.github.com/knaufk/d57b5c3c7db576f3350d >> > >> >> > >> On 25.11.2015 21:15, Konstantin Knauf wrote: >> > >>> Hi Aljoscha, >> > >>> >> > >>> sure, will do. I have neither found a solution. I won't have time >> to put >> > >>> a minimal example together before the weekend though. >> > >>> >> > >>> Cheers, >> > >>> >> > >>> Konstantin >> > >>> >> > >>> On 25.11.2015 19:10, Aljoscha Krettek wrote: >> > >>>> Hi Konstantin, >> > >>>> I still didn’t come up with an explanation for the behavior. Could >> you maybe send me example code (and example data if it is necessary to >> reproduce the problem.)? This would really help me pinpoint the problem. >> > >>>> >> > >>>> Cheers, >> > >>>> Aljoscha >> > >>>>> On 17 Nov 2015, at 21:42, Konstantin Knauf < >> konstantin.kn...@tngtech.com> wrote: >> > >>>>> >> > >>>>> Hi Aljoscha, >> > >>>>> >> > >>>>> Are you sure? I am running the job from my IDE at the moment. >> > >>>>> >> > >>>>> If I set >> > >>>>> >> > >>>>> StreamExecutionEnvironment.setParallelism(1); >> > >>>>> >> > >>>>> I works with the old TimestampExtractor (returning Long.MIN_VALUE >> from >> > >>>>> getCurrentWatermark() and emitting a watermark at every record) >> > >>>>> >> > >>>>> If I set >> > >>>>> >> > >>>>> StreamExecutionEnvironment.setParallelism(5); >> > >>>>> >> > >>>>> it does not work. >> > >>>>> >> > >>>>> So, if I understood you correctly, it is the opposite of what you >> were >> > >>>>> expecting?! >> > >>>>> >> > >>>>> Cheers, >> > >>>>> >> > >>>>> Konstantin >> > >>>>> >> > >>>>> >> > >>>>> On 17.11.2015 11:32, Aljoscha Krettek wrote: >> > >>>>>> Hi, >> > >>>>>> actually, the bug is more subtle. Normally, it is not a problem >> that the TimestampExtractor sometimes emits a watermark that is lower than >> the one before. (This is the result of the bug with Long.MIN_VALUE I >> mentioned before). The stream operators wait for watermarks from all >> upstream operators and only advance the watermark monotonically in lockstep >> with them. This way, the watermark cannot decrease at an operator. >> > >>>>>> >> > >>>>>> In your case, you have a topology with parallelism 1, I assume. >> In that case the operators are chained. (There is no separate operators but >> basically only one operator and element transmission happens in function >> calls). In this setting the watermarks are directly forwarded to operators >> without going through the logic I mentioned above. >> > >>>>>> >> > >>>>>> Cheers, >> > >>>>>> Aljoscha >> > >>>>>>> On 16 Nov 2015, at 18:13, Konstantin Knauf < >> konstantin.kn...@tngtech.com> wrote: >> > >>>>>>> >> > >>>>>>> Hi Aljoscha, >> > >>>>>>> >> > >>>>>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp >> and only >> > >>>>>>> emit with getCurrentWatermark [1] as you suggested. So >> basically I do >> > >>>>>>> the opposite than before (only watermarks per events vs only >> watermarks >> > >>>>>>> per autowatermark). And now it works :). The question remains, >> why it >> > >>>>>>> did not work before. As far as I see, it is an issue with the >> first >> > >>>>>>> TimestmapExtractor itself?! >> > >>>>>>> >> > >>>>>>> Does getCurrentWatermark(..) somehow "overpower" the extracted >> watermarks? >> > >>>>>>> >> > >>>>>>> Cheers, >> > >>>>>>> >> > >>>>>>> Konstantin >> > >>>>>>> >> > >>>>>>> [1] >> > >>>>>>> >> > >>>>>>> final private long maxDelay; >> > >>>>>>> private long lastTimestamp = Long.MIN_VALUE; >> > >>>>>>> >> > >>>>>>> public PojoTimestampExtractor(long maxDelay) { >> > >>>>>>> this.maxDelay = maxDelay; >> > >>>>>>> } >> > >>>>>>> >> > >>>>>>> @Override >> > >>>>>>> public long extractTimestamp(Pojo pojo, long l) { >> > >>>>>>> lastTimestamp = pojo.getTime(); >> > >>>>>>> return pojo.getTime(); >> > >>>>>>> } >> > >>>>>>> >> > >>>>>>> @Override >> > >>>>>>> public long extractWatermark(Pojo pojo, long l) { >> > >>>>>>> return Long.MIN_VALUE; >> > >>>>>>> } >> > >>>>>>> >> > >>>>>>> @Override >> > >>>>>>> public long getCurrentWatermark() { >> > >>>>>>> return lastTimestamp - maxDelay; >> > >>>>>>> } >> > >>>>>>> >> > >>>>>>> >> > >>>>>>> On 16.11.2015 13:37, Aljoscha Krettek wrote: >> > >>>>>>>> Hi, >> > >>>>>>>> yes, at your data-rate emitting a watermark for every element >> should not be a problem. It could become a problem with higher data-rates >> since the system can get overwhelmed if every element also generates a >> watermark. In that case I would suggest storing the lastest >> element-timestamp in an internal field and only emitting in >> getCurrentWatermark(), since then, then the watermark interval can be tunes >> using the auto-watermark interval setting. >> > >>>>>>>> >> > >>>>>>>> But that should not be the cause of the problem that you >> currently have. Would you maybe be willing to send me some (mock) example >> data and the code so that I can reproduce the problem and have a look at >> it? to aljoscha at apache.org. >> > >>>>>>>> >> > >>>>>>>> Cheers, >> > >>>>>>>> Aljoscha >> > >>>>>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf < >> konstantin.kn...@tngtech.com> wrote: >> > >>>>>>>>> >> > >>>>>>>>> Hi Aljoscha, >> > >>>>>>>>> >> > >>>>>>>>> ok, now I at least understand, why it works with >> fromElements(...). For >> > >>>>>>>>> the rest I am not so sure. >> > >>>>>>>>> >> > >>>>>>>>>> What this means in your case is that the watermark can only >> advance if >> > >>>>>>>>> a new element arrives, because only then is the watermark >> updated. >> > >>>>>>>>> >> > >>>>>>>>> But new elements arrive all the time, about 50/s, or do you >> mean >> > >>>>>>>>> something else? >> > >>>>>>>>> >> > >>>>>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to >> be an ok >> > >>>>>>>>> choice, if i understand the semantics correctly. It just >> affects >> > >>>>>>>>> watermarking in the absence of events, right? >> > >>>>>>>>> >> > >>>>>>>>> Cheers, >> > >>>>>>>>> >> > >>>>>>>>> Konstantin >> > >>>>>>>>> >> > >>>>>>>>> >> > >>>>>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote: >> > >>>>>>>>>> Hi, >> > >>>>>>>>>> it could be what Gyula mentioned. Let me first go a bit into >> how the TimestampExtractor works internally. >> > >>>>>>>>>> >> > >>>>>>>>>> First, the timestamp extractor internally keeps the value of >> the last emitted watermark. Then, the semantics of the TimestampExtractor >> are as follows : >> > >>>>>>>>>> - the result of extractTimestamp is taken and it replaces >> the internal timestamp of the element >> > >>>>>>>>>> - if the result of extractWatermark is larger than the last >> watermark the new value is emitted as a watermark and the value is stored >> > >>>>>>>>>> - getCurrentWatermark is called on the specified >> auto-watermark interval, if the returned value is larger than the last >> watermark it is emitted and stored as last watermark >> > >>>>>>>>>> >> > >>>>>>>>>> What this means in your case is that the watermark can only >> advance if a new element arrives, because only then is the watermark >> updated. >> > >>>>>>>>>> >> > >>>>>>>>>> The reason why you see results if you use fromElements is >> that the window-operator also emits all the windows that it currently has >> buffered if the program closes. This happens in the case of fromElements >> because only a finite number of elements is emitted, after which the source >> closes, thereby finishing the whole program. >> > >>>>>>>>>> >> > >>>>>>>>>> Cheers, >> > >>>>>>>>>> Aljoscha >> > >>>>>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.f...@gmail.com> >> wrote: >> > >>>>>>>>>>> >> > >>>>>>>>>>> Could this part of the extractor be the problem Aljoscha? >> > >>>>>>>>>>> >> > >>>>>>>>>>> @Override >> > >>>>>>>>>>> public long getCurrentWatermark() { >> > >>>>>>>>>>> return Long.MIN_VALUE; >> > >>>>>>>>>>> } >> > >>>>>>>>>>> >> > >>>>>>>>>>> Gyula >> > >>>>>>>>>>> >> > >>>>>>>>>>> Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta >> (időpont: 2015. nov. 16., H, 10:39): >> > >>>>>>>>>>> Hi Aljoscha, >> > >>>>>>>>>>> >> > >>>>>>>>>>> thanks for your answer. Yes I am using the same >> TimestampExtractor-Class. >> > >>>>>>>>>>> >> > >>>>>>>>>>> The timestamps look good to me. Here an example. >> > >>>>>>>>>>> >> > >>>>>>>>>>> {"time": 1447666537260, ...} And parsed: >> 2015-11-16T10:35:37.260+01:00 >> > >>>>>>>>>>> >> > >>>>>>>>>>> The order now is >> > >>>>>>>>>>> >> > >>>>>>>>>>> stream >> > >>>>>>>>>>> .map(dummyMapper) >> > >>>>>>>>>>> .assignTimestamps(...) >> > >>>>>>>>>>> .timeWindow(...) >> > >>>>>>>>>>> >> > >>>>>>>>>>> Is there a way to print out the assigned timestamps after >> > >>>>>>>>>>> stream.assignTimestamps(...)? >> > >>>>>>>>>>> >> > >>>>>>>>>>> Cheers, >> > >>>>>>>>>>> >> > >>>>>>>>>>> Konstantin >> > >>>>>>>>>>> >> > >>>>>>>>>>> >> > >>>>>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote: >> > >>>>>>>>>>>> Hi, >> > >>>>>>>>>>>> are you also using the timestamp extractor when you are >> using env.fromCollection(). >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> Could you maybe insert a dummy mapper after the Kafka >> source that just prints the element and forwards it? To see if the elements >> come with a good timestamp from Kafka. >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> Cheers, >> > >>>>>>>>>>>> Aljoscha >> > >>>>>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf < >> konstantin.kn...@tngtech.com> wrote: >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> Hi everyone, >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> I have the following issue with Flink (0.10) and Kafka. >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> I am using a very simple TimestampExtractor like [1], >> which just >> > >>>>>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming >> job, I read in >> > >>>>>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 >> like this: >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082< >> > >>>>>>>>>>>>> (parameterTool.getRequired("topic"), >> > >>>>>>>>>>>>> new AvroPojoDeserializationSchema(), >> > >>>>>>>>>>>>> parameterTool.getProperties())) >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are >> EventTime, >> > >>>>>>>>>>>>> AutoWatermarkIntervall is 500. >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> The problem is, when I do something like: >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000)) >> > >>>>>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS) >> > >>>>>>>>>>>>> .sum(..) >> > >>>>>>>>>>>>> .print() >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> env.execute(); >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> the windows never get triggered. >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> If I use ProcessingTime it works. >> > >>>>>>>>>>>>> If I use env.fromCollection(...) instead of the >> KafkaSource it works >> > >>>>>>>>>>>>> with EventTime, too. >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> Any ideas what I could be doing wrong are highly >> appreciated. >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> Cheers, >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> Konstantin >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> [1]: >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> public class PojoTimestampExtractor implements >> TimestampExtractor<Pojo> { >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> final private long maxDelay; >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> public PojoTimestampExtractor(long maxDelay) { >> > >>>>>>>>>>>>> this.maxDelay = maxDelay; >> > >>>>>>>>>>>>> } >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> @Override >> > >>>>>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) { >> > >>>>>>>>>>>>> return pojo.getTime(); >> > >>>>>>>>>>>>> } >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> @Override >> > >>>>>>>>>>>>> public long extractWatermark(Pojo pojo, long l) { >> > >>>>>>>>>>>>> return pojo.getTime() - maxDelay; >> > >>>>>>>>>>>>> } >> > >>>>>>>>>>>>> >> > >>>>>>>>>>>>> @Override >> > >>>>>>>>>>>>> public long getCurrentWatermark() { >> > >>>>>>>>>>>>> return Long.MIN_VALUE; >> > >>>>>>>>>>>>> } >> > >>>>>>>>>>>> >> > >>>>>>>>>>>> >> > >>>>>>>>>>> >> > >>>>>>>>>>> -- >> > >>>>>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * >> +49-174-3413182 >> > >>>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 >> Unterföhring >> > >>>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. >> Robert Dahlke >> > >>>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >> > >>>>>>>>>> >> > >>>>>>>>>> >> > >>>>>>>>> >> > >>>>>>>>> -- >> > >>>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * >> +49-174-3413182 >> > >>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 >> Unterföhring >> > >>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert >> Dahlke >> > >>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >> > >>>>>>>> >> > >>>>>>>> >> > >>>>>>> >> > >>>>>>> -- >> > >>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * >> +49-174-3413182 >> > >>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >> > >>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert >> Dahlke >> > >>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >> > >>>>>> >> > >>>>>> >> > >>>>> >> > >>>>> -- >> > >>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 >> > >>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >> > >>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert >> Dahlke >> > >>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >> > >>>> >> > >>>> >> > >>> >> > >> >> > >> -- >> > >> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 >> > >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >> > >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >> > >> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >> > > >> > >> >> >