Hi Aljoscha, sure, will do. I have neither found a solution. I won't have time to put a minimal example together before the weekend though.
Cheers, Konstantin On 25.11.2015 19:10, Aljoscha Krettek wrote: > Hi Konstantin, > I still didn’t come up with an explanation for the behavior. Could you maybe > send me example code (and example data if it is necessary to reproduce the > problem.)? This would really help me pinpoint the problem. > > Cheers, > Aljoscha >> On 17 Nov 2015, at 21:42, Konstantin Knauf <konstantin.kn...@tngtech.com> >> wrote: >> >> Hi Aljoscha, >> >> Are you sure? I am running the job from my IDE at the moment. >> >> If I set >> >> StreamExecutionEnvironment.setParallelism(1); >> >> I works with the old TimestampExtractor (returning Long.MIN_VALUE from >> getCurrentWatermark() and emitting a watermark at every record) >> >> If I set >> >> StreamExecutionEnvironment.setParallelism(5); >> >> it does not work. >> >> So, if I understood you correctly, it is the opposite of what you were >> expecting?! >> >> Cheers, >> >> Konstantin >> >> >> On 17.11.2015 11:32, Aljoscha Krettek wrote: >>> Hi, >>> actually, the bug is more subtle. Normally, it is not a problem that the >>> TimestampExtractor sometimes emits a watermark that is lower than the one >>> before. (This is the result of the bug with Long.MIN_VALUE I mentioned >>> before). The stream operators wait for watermarks from all upstream >>> operators and only advance the watermark monotonically in lockstep with >>> them. This way, the watermark cannot decrease at an operator. >>> >>> In your case, you have a topology with parallelism 1, I assume. In that >>> case the operators are chained. (There is no separate operators but >>> basically only one operator and element transmission happens in function >>> calls). In this setting the watermarks are directly forwarded to operators >>> without going through the logic I mentioned above. >>> >>> Cheers, >>> Aljoscha >>>> On 16 Nov 2015, at 18:13, Konstantin Knauf <konstantin.kn...@tngtech.com> >>>> wrote: >>>> >>>> Hi Aljoscha, >>>> >>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only >>>> emit with getCurrentWatermark [1] as you suggested. So basically I do >>>> the opposite than before (only watermarks per events vs only watermarks >>>> per autowatermark). And now it works :). The question remains, why it >>>> did not work before. As far as I see, it is an issue with the first >>>> TimestmapExtractor itself?! >>>> >>>> Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks? >>>> >>>> Cheers, >>>> >>>> Konstantin >>>> >>>> [1] >>>> >>>> final private long maxDelay; >>>> private long lastTimestamp = Long.MIN_VALUE; >>>> >>>> public PojoTimestampExtractor(long maxDelay) { >>>> this.maxDelay = maxDelay; >>>> } >>>> >>>> @Override >>>> public long extractTimestamp(Pojo pojo, long l) { >>>> lastTimestamp = pojo.getTime(); >>>> return pojo.getTime(); >>>> } >>>> >>>> @Override >>>> public long extractWatermark(Pojo pojo, long l) { >>>> return Long.MIN_VALUE; >>>> } >>>> >>>> @Override >>>> public long getCurrentWatermark() { >>>> return lastTimestamp - maxDelay; >>>> } >>>> >>>> >>>> On 16.11.2015 13:37, Aljoscha Krettek wrote: >>>>> Hi, >>>>> yes, at your data-rate emitting a watermark for every element should not >>>>> be a problem. It could become a problem with higher data-rates since the >>>>> system can get overwhelmed if every element also generates a watermark. >>>>> In that case I would suggest storing the lastest element-timestamp in an >>>>> internal field and only emitting in getCurrentWatermark(), since then, >>>>> then the watermark interval can be tunes using the auto-watermark >>>>> interval setting. >>>>> >>>>> But that should not be the cause of the problem that you currently have. >>>>> Would you maybe be willing to send me some (mock) example data and the >>>>> code so that I can reproduce the problem and have a look at it? to >>>>> aljoscha at apache.org. >>>>> >>>>> Cheers, >>>>> Aljoscha >>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf >>>>>> <konstantin.kn...@tngtech.com> wrote: >>>>>> >>>>>> Hi Aljoscha, >>>>>> >>>>>> ok, now I at least understand, why it works with fromElements(...). For >>>>>> the rest I am not so sure. >>>>>> >>>>>>> What this means in your case is that the watermark can only advance if >>>>>> a new element arrives, because only then is the watermark updated. >>>>>> >>>>>> But new elements arrive all the time, about 50/s, or do you mean >>>>>> something else? >>>>>> >>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok >>>>>> choice, if i understand the semantics correctly. It just affects >>>>>> watermarking in the absence of events, right? >>>>>> >>>>>> Cheers, >>>>>> >>>>>> Konstantin >>>>>> >>>>>> >>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote: >>>>>>> Hi, >>>>>>> it could be what Gyula mentioned. Let me first go a bit into how the >>>>>>> TimestampExtractor works internally. >>>>>>> >>>>>>> First, the timestamp extractor internally keeps the value of the last >>>>>>> emitted watermark. Then, the semantics of the TimestampExtractor are as >>>>>>> follows : >>>>>>> - the result of extractTimestamp is taken and it replaces the internal >>>>>>> timestamp of the element >>>>>>> - if the result of extractWatermark is larger than the last watermark >>>>>>> the new value is emitted as a watermark and the value is stored >>>>>>> - getCurrentWatermark is called on the specified auto-watermark >>>>>>> interval, if the returned value is larger than the last watermark it is >>>>>>> emitted and stored as last watermark >>>>>>> >>>>>>> What this means in your case is that the watermark can only advance if >>>>>>> a new element arrives, because only then is the watermark updated. >>>>>>> >>>>>>> The reason why you see results if you use fromElements is that the >>>>>>> window-operator also emits all the windows that it currently has >>>>>>> buffered if the program closes. This happens in the case of >>>>>>> fromElements because only a finite number of elements is emitted, after >>>>>>> which the source closes, thereby finishing the whole program. >>>>>>> >>>>>>> Cheers, >>>>>>> Aljoscha >>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.f...@gmail.com> wrote: >>>>>>>> >>>>>>>> Could this part of the extractor be the problem Aljoscha? >>>>>>>> >>>>>>>> @Override >>>>>>>> public long getCurrentWatermark() { >>>>>>>> return Long.MIN_VALUE; >>>>>>>> } >>>>>>>> >>>>>>>> Gyula >>>>>>>> >>>>>>>> Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta (időpont: >>>>>>>> 2015. nov. 16., H, 10:39): >>>>>>>> Hi Aljoscha, >>>>>>>> >>>>>>>> thanks for your answer. Yes I am using the same >>>>>>>> TimestampExtractor-Class. >>>>>>>> >>>>>>>> The timestamps look good to me. Here an example. >>>>>>>> >>>>>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00 >>>>>>>> >>>>>>>> The order now is >>>>>>>> >>>>>>>> stream >>>>>>>> .map(dummyMapper) >>>>>>>> .assignTimestamps(...) >>>>>>>> .timeWindow(...) >>>>>>>> >>>>>>>> Is there a way to print out the assigned timestamps after >>>>>>>> stream.assignTimestamps(...)? >>>>>>>> >>>>>>>> Cheers, >>>>>>>> >>>>>>>> Konstantin >>>>>>>> >>>>>>>> >>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote: >>>>>>>>> Hi, >>>>>>>>> are you also using the timestamp extractor when you are using >>>>>>>>> env.fromCollection(). >>>>>>>>> >>>>>>>>> Could you maybe insert a dummy mapper after the Kafka source that >>>>>>>>> just prints the element and forwards it? To see if the elements come >>>>>>>>> with a good timestamp from Kafka. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Aljoscha >>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf >>>>>>>>>> <konstantin.kn...@tngtech.com> wrote: >>>>>>>>>> >>>>>>>>>> Hi everyone, >>>>>>>>>> >>>>>>>>>> I have the following issue with Flink (0.10) and Kafka. >>>>>>>>>> >>>>>>>>>> I am using a very simple TimestampExtractor like [1], which just >>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read >>>>>>>>>> in >>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this: >>>>>>>>>> >>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082< >>>>>>>>>> (parameterTool.getRequired("topic"), >>>>>>>>>> new AvroPojoDeserializationSchema(), >>>>>>>>>> parameterTool.getProperties())) >>>>>>>>>> >>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime, >>>>>>>>>> AutoWatermarkIntervall is 500. >>>>>>>>>> >>>>>>>>>> The problem is, when I do something like: >>>>>>>>>> >>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000)) >>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS) >>>>>>>>>> .sum(..) >>>>>>>>>> .print() >>>>>>>>>> >>>>>>>>>> env.execute(); >>>>>>>>>> >>>>>>>>>> the windows never get triggered. >>>>>>>>>> >>>>>>>>>> If I use ProcessingTime it works. >>>>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works >>>>>>>>>> with EventTime, too. >>>>>>>>>> >>>>>>>>>> Any ideas what I could be doing wrong are highly appreciated. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> >>>>>>>>>> Konstantin >>>>>>>>>> >>>>>>>>>> [1]: >>>>>>>>>> >>>>>>>>>> public class PojoTimestampExtractor implements >>>>>>>>>> TimestampExtractor<Pojo> { >>>>>>>>>> >>>>>>>>>> final private long maxDelay; >>>>>>>>>> >>>>>>>>>> public PojoTimestampExtractor(long maxDelay) { >>>>>>>>>> this.maxDelay = maxDelay; >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) { >>>>>>>>>> return pojo.getTime(); >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public long extractWatermark(Pojo pojo, long l) { >>>>>>>>>> return pojo.getTime() - maxDelay; >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public long getCurrentWatermark() { >>>>>>>>>> return Long.MIN_VALUE; >>>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 >>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 >>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>>>> >>>>> >>>> >>>> -- >>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 >>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>> >>> >> >> -- >> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >> Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > -- Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082