I think we need to find a solution for this problem soon. Another user is most likely affected: http://stackoverflow.com/q/34090808/568695
I've filed a JIRA for the problem: https://issues.apache.org/jira/browse/FLINK-3121 On Mon, Nov 30, 2015 at 5:58 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > Maybe. In the Kafka case we just need to ensure that parallel instances of > the source that know that they don’t have any partitions assigned to them > emit Long.MAX_VALUE as a watermark. > > > On 30 Nov 2015, at 17:50, Gyula Fóra <gyula.f...@gmail.com> wrote: > > > > Hi, > > > > I think what we will need at some point for this are approximate > whatermarks which correlate event and ingest time. > > > > I think they have similar concepts in Millwheel/Dataflow. > > > > Cheers, > > Gyula > > On Mon, Nov 30, 2015 at 5:29 PM Aljoscha Krettek <aljos...@apache.org> > wrote: > > Hi, > > as an addition. I don’t have a solution yet, for the general problem of > what happens when a parallel instance of a source never receives elements. > This watermark business is very tricky... > > > > Cheers, > > Aljoscha > > > On 30 Nov 2015, at 17:20, Aljoscha Krettek <aljos...@apache.org> > wrote: > > > > > > Hi Konstantin, > > > I finally nailed down the problem. :-) > > > > > > The basis of the problem is the fact that there is a mismatch in the > parallelism of the Flink Kafka Consumer and the number of partitions in the > Kafka Stream. I would assume that in your case the Kafka Stream has 1 > partition. This means, that only one of the parallel instances of the Flink > Kafka Consumer ever receives element, which in turn means that only one of > the parallel instances of the timestamp extractor ever receives elements. > This means that no watermarks get emitted for the other parallel instances > which in turn means that the watermark does not advance downstream because > the watermark at an operator is the minimum over all upstream watermarks. > This explains why ExampleTimestampExtractor1 only works in the case with > parallelism=1. > > > > > > The reason why ExampleTimestampExtractor2 works in all parallelism > settings is not very obvious. The secret is in this method: > > > > > > @Override > > > public long getCurrentWatermark() { > > > return lastTimestamp - maxDelay; > > > } > > > > > > In the parallel instances that never receive any element lastTimestamp > is set to Long.MIN_VALUE. So “lastTimestamp - maxDelay” is (Long.MAX_VALUE > - maxDelay (+1)). Now, because the watermark at an operator is always the > minimum over all watermarks from upstream operators the watermark at the > window operator always tracks the watermark of the parallel instance that > receives elements. > > > > > > I hope this helps, but please let me know if I should provide more > explanation. This is a very tricky topic. > > > > > > Cheers, > > > Aljoscha > > > > > >> On 29 Nov 2015, at 21:18, Konstantin Knauf < > konstantin.kn...@tngtech.com> wrote: > > >> > > >> Hi Aljoscha, > > >> > > >> I have put together a gist [1] with two classes, a short processing > > >> pipeline, which shows the behavior and a data generator to write > records > > >> into Kafka. I hope I remembered everything we discussed correctly. > > >> > > >> So basically in the example it works with "TimestampExtractor1" only > for > > >> parallelism 1, with "TimestampExtractor2" it works regardless of the > > >> parallelism. Run from the IDE. > > >> > > >> Let me know if you need anything else. > > >> > > >> Cheers, > > >> > > >> Konstantin > > >> > > >> [1] https://gist.github.com/knaufk/d57b5c3c7db576f3350d > > >> > > >> On 25.11.2015 21:15, Konstantin Knauf wrote: > > >>> Hi Aljoscha, > > >>> > > >>> sure, will do. I have neither found a solution. I won't have time to > put > > >>> a minimal example together before the weekend though. > > >>> > > >>> Cheers, > > >>> > > >>> Konstantin > > >>> > > >>> On 25.11.2015 19:10, Aljoscha Krettek wrote: > > >>>> Hi Konstantin, > > >>>> I still didn’t come up with an explanation for the behavior. Could > you maybe send me example code (and example data if it is necessary to > reproduce the problem.)? This would really help me pinpoint the problem. > > >>>> > > >>>> Cheers, > > >>>> Aljoscha > > >>>>> On 17 Nov 2015, at 21:42, Konstantin Knauf < > konstantin.kn...@tngtech.com> wrote: > > >>>>> > > >>>>> Hi Aljoscha, > > >>>>> > > >>>>> Are you sure? I am running the job from my IDE at the moment. > > >>>>> > > >>>>> If I set > > >>>>> > > >>>>> StreamExecutionEnvironment.setParallelism(1); > > >>>>> > > >>>>> I works with the old TimestampExtractor (returning Long.MIN_VALUE > from > > >>>>> getCurrentWatermark() and emitting a watermark at every record) > > >>>>> > > >>>>> If I set > > >>>>> > > >>>>> StreamExecutionEnvironment.setParallelism(5); > > >>>>> > > >>>>> it does not work. > > >>>>> > > >>>>> So, if I understood you correctly, it is the opposite of what you > were > > >>>>> expecting?! > > >>>>> > > >>>>> Cheers, > > >>>>> > > >>>>> Konstantin > > >>>>> > > >>>>> > > >>>>> On 17.11.2015 11:32, Aljoscha Krettek wrote: > > >>>>>> Hi, > > >>>>>> actually, the bug is more subtle. Normally, it is not a problem > that the TimestampExtractor sometimes emits a watermark that is lower than > the one before. (This is the result of the bug with Long.MIN_VALUE I > mentioned before). The stream operators wait for watermarks from all > upstream operators and only advance the watermark monotonically in lockstep > with them. This way, the watermark cannot decrease at an operator. > > >>>>>> > > >>>>>> In your case, you have a topology with parallelism 1, I assume. > In that case the operators are chained. (There is no separate operators but > basically only one operator and element transmission happens in function > calls). In this setting the watermarks are directly forwarded to operators > without going through the logic I mentioned above. > > >>>>>> > > >>>>>> Cheers, > > >>>>>> Aljoscha > > >>>>>>> On 16 Nov 2015, at 18:13, Konstantin Knauf < > konstantin.kn...@tngtech.com> wrote: > > >>>>>>> > > >>>>>>> Hi Aljoscha, > > >>>>>>> > > >>>>>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp > and only > > >>>>>>> emit with getCurrentWatermark [1] as you suggested. So basically > I do > > >>>>>>> the opposite than before (only watermarks per events vs only > watermarks > > >>>>>>> per autowatermark). And now it works :). The question remains, > why it > > >>>>>>> did not work before. As far as I see, it is an issue with the > first > > >>>>>>> TimestmapExtractor itself?! > > >>>>>>> > > >>>>>>> Does getCurrentWatermark(..) somehow "overpower" the extracted > watermarks? > > >>>>>>> > > >>>>>>> Cheers, > > >>>>>>> > > >>>>>>> Konstantin > > >>>>>>> > > >>>>>>> [1] > > >>>>>>> > > >>>>>>> final private long maxDelay; > > >>>>>>> private long lastTimestamp = Long.MIN_VALUE; > > >>>>>>> > > >>>>>>> public PojoTimestampExtractor(long maxDelay) { > > >>>>>>> this.maxDelay = maxDelay; > > >>>>>>> } > > >>>>>>> > > >>>>>>> @Override > > >>>>>>> public long extractTimestamp(Pojo pojo, long l) { > > >>>>>>> lastTimestamp = pojo.getTime(); > > >>>>>>> return pojo.getTime(); > > >>>>>>> } > > >>>>>>> > > >>>>>>> @Override > > >>>>>>> public long extractWatermark(Pojo pojo, long l) { > > >>>>>>> return Long.MIN_VALUE; > > >>>>>>> } > > >>>>>>> > > >>>>>>> @Override > > >>>>>>> public long getCurrentWatermark() { > > >>>>>>> return lastTimestamp - maxDelay; > > >>>>>>> } > > >>>>>>> > > >>>>>>> > > >>>>>>> On 16.11.2015 13:37, Aljoscha Krettek wrote: > > >>>>>>>> Hi, > > >>>>>>>> yes, at your data-rate emitting a watermark for every element > should not be a problem. It could become a problem with higher data-rates > since the system can get overwhelmed if every element also generates a > watermark. In that case I would suggest storing the lastest > element-timestamp in an internal field and only emitting in > getCurrentWatermark(), since then, then the watermark interval can be tunes > using the auto-watermark interval setting. > > >>>>>>>> > > >>>>>>>> But that should not be the cause of the problem that you > currently have. Would you maybe be willing to send me some (mock) example > data and the code so that I can reproduce the problem and have a look at > it? to aljoscha at apache.org. > > >>>>>>>> > > >>>>>>>> Cheers, > > >>>>>>>> Aljoscha > > >>>>>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf < > konstantin.kn...@tngtech.com> wrote: > > >>>>>>>>> > > >>>>>>>>> Hi Aljoscha, > > >>>>>>>>> > > >>>>>>>>> ok, now I at least understand, why it works with > fromElements(...). For > > >>>>>>>>> the rest I am not so sure. > > >>>>>>>>> > > >>>>>>>>>> What this means in your case is that the watermark can only > advance if > > >>>>>>>>> a new element arrives, because only then is the watermark > updated. > > >>>>>>>>> > > >>>>>>>>> But new elements arrive all the time, about 50/s, or do you > mean > > >>>>>>>>> something else? > > >>>>>>>>> > > >>>>>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be > an ok > > >>>>>>>>> choice, if i understand the semantics correctly. It just > affects > > >>>>>>>>> watermarking in the absence of events, right? > > >>>>>>>>> > > >>>>>>>>> Cheers, > > >>>>>>>>> > > >>>>>>>>> Konstantin > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote: > > >>>>>>>>>> Hi, > > >>>>>>>>>> it could be what Gyula mentioned. Let me first go a bit into > how the TimestampExtractor works internally. > > >>>>>>>>>> > > >>>>>>>>>> First, the timestamp extractor internally keeps the value of > the last emitted watermark. Then, the semantics of the TimestampExtractor > are as follows : > > >>>>>>>>>> - the result of extractTimestamp is taken and it replaces the > internal timestamp of the element > > >>>>>>>>>> - if the result of extractWatermark is larger than the last > watermark the new value is emitted as a watermark and the value is stored > > >>>>>>>>>> - getCurrentWatermark is called on the specified > auto-watermark interval, if the returned value is larger than the last > watermark it is emitted and stored as last watermark > > >>>>>>>>>> > > >>>>>>>>>> What this means in your case is that the watermark can only > advance if a new element arrives, because only then is the watermark > updated. > > >>>>>>>>>> > > >>>>>>>>>> The reason why you see results if you use fromElements is > that the window-operator also emits all the windows that it currently has > buffered if the program closes. This happens in the case of fromElements > because only a finite number of elements is emitted, after which the source > closes, thereby finishing the whole program. > > >>>>>>>>>> > > >>>>>>>>>> Cheers, > > >>>>>>>>>> Aljoscha > > >>>>>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.f...@gmail.com> > wrote: > > >>>>>>>>>>> > > >>>>>>>>>>> Could this part of the extractor be the problem Aljoscha? > > >>>>>>>>>>> > > >>>>>>>>>>> @Override > > >>>>>>>>>>> public long getCurrentWatermark() { > > >>>>>>>>>>> return Long.MIN_VALUE; > > >>>>>>>>>>> } > > >>>>>>>>>>> > > >>>>>>>>>>> Gyula > > >>>>>>>>>>> > > >>>>>>>>>>> Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta > (időpont: 2015. nov. 16., H, 10:39): > > >>>>>>>>>>> Hi Aljoscha, > > >>>>>>>>>>> > > >>>>>>>>>>> thanks for your answer. Yes I am using the same > TimestampExtractor-Class. > > >>>>>>>>>>> > > >>>>>>>>>>> The timestamps look good to me. Here an example. > > >>>>>>>>>>> > > >>>>>>>>>>> {"time": 1447666537260, ...} And parsed: > 2015-11-16T10:35:37.260+01:00 > > >>>>>>>>>>> > > >>>>>>>>>>> The order now is > > >>>>>>>>>>> > > >>>>>>>>>>> stream > > >>>>>>>>>>> .map(dummyMapper) > > >>>>>>>>>>> .assignTimestamps(...) > > >>>>>>>>>>> .timeWindow(...) > > >>>>>>>>>>> > > >>>>>>>>>>> Is there a way to print out the assigned timestamps after > > >>>>>>>>>>> stream.assignTimestamps(...)? > > >>>>>>>>>>> > > >>>>>>>>>>> Cheers, > > >>>>>>>>>>> > > >>>>>>>>>>> Konstantin > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote: > > >>>>>>>>>>>> Hi, > > >>>>>>>>>>>> are you also using the timestamp extractor when you are > using env.fromCollection(). > > >>>>>>>>>>>> > > >>>>>>>>>>>> Could you maybe insert a dummy mapper after the Kafka > source that just prints the element and forwards it? To see if the elements > come with a good timestamp from Kafka. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Cheers, > > >>>>>>>>>>>> Aljoscha > > >>>>>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf < > konstantin.kn...@tngtech.com> wrote: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Hi everyone, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> I have the following issue with Flink (0.10) and Kafka. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> I am using a very simple TimestampExtractor like [1], > which just > > >>>>>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming > job, I read in > > >>>>>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 > like this: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082< > > >>>>>>>>>>>>> (parameterTool.getRequired("topic"), > > >>>>>>>>>>>>> new AvroPojoDeserializationSchema(), > > >>>>>>>>>>>>> parameterTool.getProperties())) > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are > EventTime, > > >>>>>>>>>>>>> AutoWatermarkIntervall is 500. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> The problem is, when I do something like: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000)) > > >>>>>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS) > > >>>>>>>>>>>>> .sum(..) > > >>>>>>>>>>>>> .print() > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> env.execute(); > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> the windows never get triggered. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> If I use ProcessingTime it works. > > >>>>>>>>>>>>> If I use env.fromCollection(...) instead of the > KafkaSource it works > > >>>>>>>>>>>>> with EventTime, too. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Any ideas what I could be doing wrong are highly > appreciated. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Konstantin > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> [1]: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> public class PojoTimestampExtractor implements > TimestampExtractor<Pojo> { > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> final private long maxDelay; > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> public PojoTimestampExtractor(long maxDelay) { > > >>>>>>>>>>>>> this.maxDelay = maxDelay; > > >>>>>>>>>>>>> } > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> @Override > > >>>>>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) { > > >>>>>>>>>>>>> return pojo.getTime(); > > >>>>>>>>>>>>> } > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> @Override > > >>>>>>>>>>>>> public long extractWatermark(Pojo pojo, long l) { > > >>>>>>>>>>>>> return pojo.getTime() - maxDelay; > > >>>>>>>>>>>>> } > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> @Override > > >>>>>>>>>>>>> public long getCurrentWatermark() { > > >>>>>>>>>>>>> return Long.MIN_VALUE; > > >>>>>>>>>>>>> } > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> -- > > >>>>>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * > +49-174-3413182 > > >>>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 > Unterföhring > > >>>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert > Dahlke > > >>>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> -- > > >>>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * > +49-174-3413182 > > >>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 > Unterföhring > > >>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert > Dahlke > > >>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > >>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>>> -- > > >>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * > +49-174-3413182 > > >>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > > >>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert > Dahlke > > >>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > >>>>>> > > >>>>>> > > >>>>> > > >>>>> -- > > >>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 > > >>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > > >>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > > >>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > >>>> > > >>>> > > >>> > > >> > > >> -- > > >> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 > > >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > > >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > > >> Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > > > > > >