Maybe. In the Kafka case we just need to ensure that parallel instances of the source that know that they don’t have any partitions assigned to them emit Long.MAX_VALUE as a watermark.
> On 30 Nov 2015, at 17:50, Gyula Fóra <gyula.f...@gmail.com> wrote: > > Hi, > > I think what we will need at some point for this are approximate whatermarks > which correlate event and ingest time. > > I think they have similar concepts in Millwheel/Dataflow. > > Cheers, > Gyula > On Mon, Nov 30, 2015 at 5:29 PM Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > as an addition. I don’t have a solution yet, for the general problem of what > happens when a parallel instance of a source never receives elements. This > watermark business is very tricky... > > Cheers, > Aljoscha > > On 30 Nov 2015, at 17:20, Aljoscha Krettek <aljos...@apache.org> wrote: > > > > Hi Konstantin, > > I finally nailed down the problem. :-) > > > > The basis of the problem is the fact that there is a mismatch in the > > parallelism of the Flink Kafka Consumer and the number of partitions in the > > Kafka Stream. I would assume that in your case the Kafka Stream has 1 > > partition. This means, that only one of the parallel instances of the Flink > > Kafka Consumer ever receives element, which in turn means that only one of > > the parallel instances of the timestamp extractor ever receives elements. > > This means that no watermarks get emitted for the other parallel instances > > which in turn means that the watermark does not advance downstream because > > the watermark at an operator is the minimum over all upstream watermarks. > > This explains why ExampleTimestampExtractor1 only works in the case with > > parallelism=1. > > > > The reason why ExampleTimestampExtractor2 works in all parallelism settings > > is not very obvious. The secret is in this method: > > > > @Override > > public long getCurrentWatermark() { > > return lastTimestamp - maxDelay; > > } > > > > In the parallel instances that never receive any element lastTimestamp is > > set to Long.MIN_VALUE. So “lastTimestamp - maxDelay” is (Long.MAX_VALUE - > > maxDelay (+1)). Now, because the watermark at an operator is always the > > minimum over all watermarks from upstream operators the watermark at the > > window operator always tracks the watermark of the parallel instance that > > receives elements. > > > > I hope this helps, but please let me know if I should provide more > > explanation. This is a very tricky topic. > > > > Cheers, > > Aljoscha > > > >> On 29 Nov 2015, at 21:18, Konstantin Knauf <konstantin.kn...@tngtech.com> > >> wrote: > >> > >> Hi Aljoscha, > >> > >> I have put together a gist [1] with two classes, a short processing > >> pipeline, which shows the behavior and a data generator to write records > >> into Kafka. I hope I remembered everything we discussed correctly. > >> > >> So basically in the example it works with "TimestampExtractor1" only for > >> parallelism 1, with "TimestampExtractor2" it works regardless of the > >> parallelism. Run from the IDE. > >> > >> Let me know if you need anything else. > >> > >> Cheers, > >> > >> Konstantin > >> > >> [1] https://gist.github.com/knaufk/d57b5c3c7db576f3350d > >> > >> On 25.11.2015 21:15, Konstantin Knauf wrote: > >>> Hi Aljoscha, > >>> > >>> sure, will do. I have neither found a solution. I won't have time to put > >>> a minimal example together before the weekend though. > >>> > >>> Cheers, > >>> > >>> Konstantin > >>> > >>> On 25.11.2015 19:10, Aljoscha Krettek wrote: > >>>> Hi Konstantin, > >>>> I still didn’t come up with an explanation for the behavior. Could you > >>>> maybe send me example code (and example data if it is necessary to > >>>> reproduce the problem.)? This would really help me pinpoint the problem. > >>>> > >>>> Cheers, > >>>> Aljoscha > >>>>> On 17 Nov 2015, at 21:42, Konstantin Knauf > >>>>> <konstantin.kn...@tngtech.com> wrote: > >>>>> > >>>>> Hi Aljoscha, > >>>>> > >>>>> Are you sure? I am running the job from my IDE at the moment. > >>>>> > >>>>> If I set > >>>>> > >>>>> StreamExecutionEnvironment.setParallelism(1); > >>>>> > >>>>> I works with the old TimestampExtractor (returning Long.MIN_VALUE from > >>>>> getCurrentWatermark() and emitting a watermark at every record) > >>>>> > >>>>> If I set > >>>>> > >>>>> StreamExecutionEnvironment.setParallelism(5); > >>>>> > >>>>> it does not work. > >>>>> > >>>>> So, if I understood you correctly, it is the opposite of what you were > >>>>> expecting?! > >>>>> > >>>>> Cheers, > >>>>> > >>>>> Konstantin > >>>>> > >>>>> > >>>>> On 17.11.2015 11:32, Aljoscha Krettek wrote: > >>>>>> Hi, > >>>>>> actually, the bug is more subtle. Normally, it is not a problem that > >>>>>> the TimestampExtractor sometimes emits a watermark that is lower than > >>>>>> the one before. (This is the result of the bug with Long.MIN_VALUE I > >>>>>> mentioned before). The stream operators wait for watermarks from all > >>>>>> upstream operators and only advance the watermark monotonically in > >>>>>> lockstep with them. This way, the watermark cannot decrease at an > >>>>>> operator. > >>>>>> > >>>>>> In your case, you have a topology with parallelism 1, I assume. In > >>>>>> that case the operators are chained. (There is no separate operators > >>>>>> but basically only one operator and element transmission happens in > >>>>>> function calls). In this setting the watermarks are directly forwarded > >>>>>> to operators without going through the logic I mentioned above. > >>>>>> > >>>>>> Cheers, > >>>>>> Aljoscha > >>>>>>> On 16 Nov 2015, at 18:13, Konstantin Knauf > >>>>>>> <konstantin.kn...@tngtech.com> wrote: > >>>>>>> > >>>>>>> Hi Aljoscha, > >>>>>>> > >>>>>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp and > >>>>>>> only > >>>>>>> emit with getCurrentWatermark [1] as you suggested. So basically I do > >>>>>>> the opposite than before (only watermarks per events vs only > >>>>>>> watermarks > >>>>>>> per autowatermark). And now it works :). The question remains, why it > >>>>>>> did not work before. As far as I see, it is an issue with the first > >>>>>>> TimestmapExtractor itself?! > >>>>>>> > >>>>>>> Does getCurrentWatermark(..) somehow "overpower" the extracted > >>>>>>> watermarks? > >>>>>>> > >>>>>>> Cheers, > >>>>>>> > >>>>>>> Konstantin > >>>>>>> > >>>>>>> [1] > >>>>>>> > >>>>>>> final private long maxDelay; > >>>>>>> private long lastTimestamp = Long.MIN_VALUE; > >>>>>>> > >>>>>>> public PojoTimestampExtractor(long maxDelay) { > >>>>>>> this.maxDelay = maxDelay; > >>>>>>> } > >>>>>>> > >>>>>>> @Override > >>>>>>> public long extractTimestamp(Pojo pojo, long l) { > >>>>>>> lastTimestamp = pojo.getTime(); > >>>>>>> return pojo.getTime(); > >>>>>>> } > >>>>>>> > >>>>>>> @Override > >>>>>>> public long extractWatermark(Pojo pojo, long l) { > >>>>>>> return Long.MIN_VALUE; > >>>>>>> } > >>>>>>> > >>>>>>> @Override > >>>>>>> public long getCurrentWatermark() { > >>>>>>> return lastTimestamp - maxDelay; > >>>>>>> } > >>>>>>> > >>>>>>> > >>>>>>> On 16.11.2015 13:37, Aljoscha Krettek wrote: > >>>>>>>> Hi, > >>>>>>>> yes, at your data-rate emitting a watermark for every element should > >>>>>>>> not be a problem. It could become a problem with higher data-rates > >>>>>>>> since the system can get overwhelmed if every element also generates > >>>>>>>> a watermark. In that case I would suggest storing the lastest > >>>>>>>> element-timestamp in an internal field and only emitting in > >>>>>>>> getCurrentWatermark(), since then, then the watermark interval can > >>>>>>>> be tunes using the auto-watermark interval setting. > >>>>>>>> > >>>>>>>> But that should not be the cause of the problem that you currently > >>>>>>>> have. Would you maybe be willing to send me some (mock) example data > >>>>>>>> and the code so that I can reproduce the problem and have a look at > >>>>>>>> it? to aljoscha at apache.org. > >>>>>>>> > >>>>>>>> Cheers, > >>>>>>>> Aljoscha > >>>>>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf > >>>>>>>>> <konstantin.kn...@tngtech.com> wrote: > >>>>>>>>> > >>>>>>>>> Hi Aljoscha, > >>>>>>>>> > >>>>>>>>> ok, now I at least understand, why it works with fromElements(...). > >>>>>>>>> For > >>>>>>>>> the rest I am not so sure. > >>>>>>>>> > >>>>>>>>>> What this means in your case is that the watermark can only > >>>>>>>>>> advance if > >>>>>>>>> a new element arrives, because only then is the watermark updated. > >>>>>>>>> > >>>>>>>>> But new elements arrive all the time, about 50/s, or do you mean > >>>>>>>>> something else? > >>>>>>>>> > >>>>>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok > >>>>>>>>> choice, if i understand the semantics correctly. It just affects > >>>>>>>>> watermarking in the absence of events, right? > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> > >>>>>>>>> Konstantin > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote: > >>>>>>>>>> Hi, > >>>>>>>>>> it could be what Gyula mentioned. Let me first go a bit into how > >>>>>>>>>> the TimestampExtractor works internally. > >>>>>>>>>> > >>>>>>>>>> First, the timestamp extractor internally keeps the value of the > >>>>>>>>>> last emitted watermark. Then, the semantics of the > >>>>>>>>>> TimestampExtractor are as follows : > >>>>>>>>>> - the result of extractTimestamp is taken and it replaces the > >>>>>>>>>> internal timestamp of the element > >>>>>>>>>> - if the result of extractWatermark is larger than the last > >>>>>>>>>> watermark the new value is emitted as a watermark and the value is > >>>>>>>>>> stored > >>>>>>>>>> - getCurrentWatermark is called on the specified auto-watermark > >>>>>>>>>> interval, if the returned value is larger than the last watermark > >>>>>>>>>> it is emitted and stored as last watermark > >>>>>>>>>> > >>>>>>>>>> What this means in your case is that the watermark can only > >>>>>>>>>> advance if a new element arrives, because only then is the > >>>>>>>>>> watermark updated. > >>>>>>>>>> > >>>>>>>>>> The reason why you see results if you use fromElements is that the > >>>>>>>>>> window-operator also emits all the windows that it currently has > >>>>>>>>>> buffered if the program closes. This happens in the case of > >>>>>>>>>> fromElements because only a finite number of elements is emitted, > >>>>>>>>>> after which the source closes, thereby finishing the whole program. > >>>>>>>>>> > >>>>>>>>>> Cheers, > >>>>>>>>>> Aljoscha > >>>>>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.f...@gmail.com> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Could this part of the extractor be the problem Aljoscha? > >>>>>>>>>>> > >>>>>>>>>>> @Override > >>>>>>>>>>> public long getCurrentWatermark() { > >>>>>>>>>>> return Long.MIN_VALUE; > >>>>>>>>>>> } > >>>>>>>>>>> > >>>>>>>>>>> Gyula > >>>>>>>>>>> > >>>>>>>>>>> Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta > >>>>>>>>>>> (időpont: 2015. nov. 16., H, 10:39): > >>>>>>>>>>> Hi Aljoscha, > >>>>>>>>>>> > >>>>>>>>>>> thanks for your answer. Yes I am using the same > >>>>>>>>>>> TimestampExtractor-Class. > >>>>>>>>>>> > >>>>>>>>>>> The timestamps look good to me. Here an example. > >>>>>>>>>>> > >>>>>>>>>>> {"time": 1447666537260, ...} And parsed: > >>>>>>>>>>> 2015-11-16T10:35:37.260+01:00 > >>>>>>>>>>> > >>>>>>>>>>> The order now is > >>>>>>>>>>> > >>>>>>>>>>> stream > >>>>>>>>>>> .map(dummyMapper) > >>>>>>>>>>> .assignTimestamps(...) > >>>>>>>>>>> .timeWindow(...) > >>>>>>>>>>> > >>>>>>>>>>> Is there a way to print out the assigned timestamps after > >>>>>>>>>>> stream.assignTimestamps(...)? > >>>>>>>>>>> > >>>>>>>>>>> Cheers, > >>>>>>>>>>> > >>>>>>>>>>> Konstantin > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote: > >>>>>>>>>>>> Hi, > >>>>>>>>>>>> are you also using the timestamp extractor when you are using > >>>>>>>>>>>> env.fromCollection(). > >>>>>>>>>>>> > >>>>>>>>>>>> Could you maybe insert a dummy mapper after the Kafka source > >>>>>>>>>>>> that just prints the element and forwards it? To see if the > >>>>>>>>>>>> elements come with a good timestamp from Kafka. > >>>>>>>>>>>> > >>>>>>>>>>>> Cheers, > >>>>>>>>>>>> Aljoscha > >>>>>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf > >>>>>>>>>>>>> <konstantin.kn...@tngtech.com> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>> > >>>>>>>>>>>>> I have the following issue with Flink (0.10) and Kafka. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I am using a very simple TimestampExtractor like [1], which just > >>>>>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I > >>>>>>>>>>>>> read in > >>>>>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like > >>>>>>>>>>>>> this: > >>>>>>>>>>>>> > >>>>>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082< > >>>>>>>>>>>>> (parameterTool.getRequired("topic"), > >>>>>>>>>>>>> new AvroPojoDeserializationSchema(), > >>>>>>>>>>>>> parameterTool.getProperties())) > >>>>>>>>>>>>> > >>>>>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are > >>>>>>>>>>>>> EventTime, > >>>>>>>>>>>>> AutoWatermarkIntervall is 500. > >>>>>>>>>>>>> > >>>>>>>>>>>>> The problem is, when I do something like: > >>>>>>>>>>>>> > >>>>>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000)) > >>>>>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS) > >>>>>>>>>>>>> .sum(..) > >>>>>>>>>>>>> .print() > >>>>>>>>>>>>> > >>>>>>>>>>>>> env.execute(); > >>>>>>>>>>>>> > >>>>>>>>>>>>> the windows never get triggered. > >>>>>>>>>>>>> > >>>>>>>>>>>>> If I use ProcessingTime it works. > >>>>>>>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it > >>>>>>>>>>>>> works > >>>>>>>>>>>>> with EventTime, too. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Any ideas what I could be doing wrong are highly appreciated. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Konstantin > >>>>>>>>>>>>> > >>>>>>>>>>>>> [1]: > >>>>>>>>>>>>> > >>>>>>>>>>>>> public class PojoTimestampExtractor implements > >>>>>>>>>>>>> TimestampExtractor<Pojo> { > >>>>>>>>>>>>> > >>>>>>>>>>>>> final private long maxDelay; > >>>>>>>>>>>>> > >>>>>>>>>>>>> public PojoTimestampExtractor(long maxDelay) { > >>>>>>>>>>>>> this.maxDelay = maxDelay; > >>>>>>>>>>>>> } > >>>>>>>>>>>>> > >>>>>>>>>>>>> @Override > >>>>>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) { > >>>>>>>>>>>>> return pojo.getTime(); > >>>>>>>>>>>>> } > >>>>>>>>>>>>> > >>>>>>>>>>>>> @Override > >>>>>>>>>>>>> public long extractWatermark(Pojo pojo, long l) { > >>>>>>>>>>>>> return pojo.getTime() - maxDelay; > >>>>>>>>>>>>> } > >>>>>>>>>>>>> > >>>>>>>>>>>>> @Override > >>>>>>>>>>>>> public long getCurrentWatermark() { > >>>>>>>>>>>>> return Long.MIN_VALUE; > >>>>>>>>>>>>> } > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -- > >>>>>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 > >>>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > >>>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert > >>>>>>>>>>> Dahlke > >>>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 > >>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > >>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > >>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 > >>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > >>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > >>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 > >>>>>> > >>>>>> > >>>>> > >>>>> -- > >>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 > >>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > >>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > >>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 > >>>> > >>>> > >>> > >> > >> -- > >> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 > >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > >> Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > >