Maybe. In the Kafka case we just need to ensure that parallel instances of the 
source that know that they don’t have any partitions assigned to them emit 
Long.MAX_VALUE as a watermark.

> On 30 Nov 2015, at 17:50, Gyula Fóra <gyula.f...@gmail.com> wrote:
> 
> Hi,
> 
> I think what we will need at some point for this are approximate whatermarks 
> which correlate event and ingest time.
> 
> I think they have similar concepts in Millwheel/Dataflow.
> 
> Cheers,
> Gyula
> On Mon, Nov 30, 2015 at 5:29 PM Aljoscha Krettek <aljos...@apache.org> wrote:
> Hi,
> as an addition. I don’t have a solution yet, for the general problem of what 
> happens when a parallel instance of a source never receives elements. This 
> watermark business is very tricky...
> 
> Cheers,
> Aljoscha
> > On 30 Nov 2015, at 17:20, Aljoscha Krettek <aljos...@apache.org> wrote:
> >
> > Hi Konstantin,
> > I finally nailed down the problem. :-)
> >
> > The basis of the problem is the fact that there is a mismatch in the 
> > parallelism of the Flink Kafka Consumer and the number of partitions in the 
> > Kafka Stream. I would assume that in your case the Kafka Stream has 1 
> > partition. This means, that only one of the parallel instances of the Flink 
> > Kafka Consumer ever receives element, which in turn means that only one of 
> > the parallel instances of the timestamp extractor ever receives elements. 
> > This means that no watermarks get emitted for the other parallel instances 
> > which in turn means that the watermark does not advance downstream because 
> > the watermark at an operator is the minimum over all upstream watermarks. 
> > This explains why ExampleTimestampExtractor1 only works in the case with 
> > parallelism=1.
> >
> > The reason why ExampleTimestampExtractor2 works in all parallelism settings 
> > is not very obvious. The secret is in this method:
> >
> > @Override
> > public long getCurrentWatermark() {
> >   return lastTimestamp - maxDelay;
> > }
> >
> > In the parallel instances that never receive any element lastTimestamp is 
> > set to Long.MIN_VALUE. So “lastTimestamp - maxDelay” is (Long.MAX_VALUE - 
> > maxDelay (+1)). Now, because the watermark at an operator is always the 
> > minimum over all watermarks from upstream operators the watermark at the 
> > window operator always tracks the watermark of the parallel instance that 
> > receives elements.
> >
> > I hope this helps, but please let me know if I should provide more 
> > explanation. This is a very tricky topic.
> >
> > Cheers,
> > Aljoscha
> >
> >> On 29 Nov 2015, at 21:18, Konstantin Knauf <konstantin.kn...@tngtech.com> 
> >> wrote:
> >>
> >> Hi Aljoscha,
> >>
> >> I have put together a gist [1] with two classes, a short processing
> >> pipeline, which shows the behavior and a data generator to write records
> >> into Kafka. I hope I remembered everything we discussed correctly.
> >>
> >> So basically in the example it works with "TimestampExtractor1" only for
> >> parallelism 1, with "TimestampExtractor2" it works regardless of the
> >> parallelism. Run from the IDE.
> >>
> >> Let me know if you need anything else.
> >>
> >> Cheers,
> >>
> >> Konstantin
> >>
> >> [1] https://gist.github.com/knaufk/d57b5c3c7db576f3350d
> >>
> >> On 25.11.2015 21:15, Konstantin Knauf wrote:
> >>> Hi Aljoscha,
> >>>
> >>> sure, will do. I have neither found a solution. I won't have time to put
> >>> a minimal example together before the weekend though.
> >>>
> >>> Cheers,
> >>>
> >>> Konstantin
> >>>
> >>> On 25.11.2015 19:10, Aljoscha Krettek wrote:
> >>>> Hi Konstantin,
> >>>> I still didn’t come up with an explanation for the behavior. Could you 
> >>>> maybe send me example code (and example data if it is necessary to 
> >>>> reproduce the problem.)? This would really help me pinpoint the problem.
> >>>>
> >>>> Cheers,
> >>>> Aljoscha
> >>>>> On 17 Nov 2015, at 21:42, Konstantin Knauf 
> >>>>> <konstantin.kn...@tngtech.com> wrote:
> >>>>>
> >>>>> Hi Aljoscha,
> >>>>>
> >>>>> Are you sure? I am running the job from my IDE at the moment.
> >>>>>
> >>>>> If I set
> >>>>>
> >>>>> StreamExecutionEnvironment.setParallelism(1);
> >>>>>
> >>>>> I works with the old TimestampExtractor (returning Long.MIN_VALUE from
> >>>>> getCurrentWatermark() and emitting a watermark at every record)
> >>>>>
> >>>>> If I set
> >>>>>
> >>>>> StreamExecutionEnvironment.setParallelism(5);
> >>>>>
> >>>>> it does not work.
> >>>>>
> >>>>> So, if I understood you correctly, it is the opposite of what you were
> >>>>> expecting?!
> >>>>>
> >>>>> Cheers,
> >>>>>
> >>>>> Konstantin
> >>>>>
> >>>>>
> >>>>> On 17.11.2015 11:32, Aljoscha Krettek wrote:
> >>>>>> Hi,
> >>>>>> actually, the bug is more subtle. Normally, it is not a problem that 
> >>>>>> the TimestampExtractor sometimes emits a watermark that is lower than 
> >>>>>> the one before. (This is the result of the bug with Long.MIN_VALUE I 
> >>>>>> mentioned before). The stream operators wait for watermarks from all 
> >>>>>> upstream operators and only advance the watermark monotonically in 
> >>>>>> lockstep with them. This way, the watermark cannot decrease at an 
> >>>>>> operator.
> >>>>>>
> >>>>>> In your case, you have a topology with parallelism 1, I assume. In 
> >>>>>> that case the operators are chained. (There is no separate operators 
> >>>>>> but basically only one operator and element transmission happens in 
> >>>>>> function calls). In this setting the watermarks are directly forwarded 
> >>>>>> to operators without going through the logic I mentioned above.
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Aljoscha
> >>>>>>> On 16 Nov 2015, at 18:13, Konstantin Knauf 
> >>>>>>> <konstantin.kn...@tngtech.com> wrote:
> >>>>>>>
> >>>>>>> Hi Aljoscha,
> >>>>>>>
> >>>>>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp and 
> >>>>>>> only
> >>>>>>> emit with getCurrentWatermark [1] as you suggested. So basically I do
> >>>>>>> the opposite than before (only watermarks per events vs only 
> >>>>>>> watermarks
> >>>>>>> per autowatermark). And now it works :). The question remains, why it
> >>>>>>> did not work before. As far as I see, it is an issue with the first
> >>>>>>> TimestmapExtractor itself?!
> >>>>>>>
> >>>>>>> Does getCurrentWatermark(..) somehow "overpower" the extracted 
> >>>>>>> watermarks?
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>>
> >>>>>>> Konstantin
> >>>>>>>
> >>>>>>> [1]
> >>>>>>>
> >>>>>>> final private long maxDelay;
> >>>>>>> private long lastTimestamp = Long.MIN_VALUE;
> >>>>>>>
> >>>>>>> public PojoTimestampExtractor(long maxDelay) {
> >>>>>>>     this.maxDelay = maxDelay;
> >>>>>>> }
> >>>>>>>
> >>>>>>> @Override
> >>>>>>> public long extractTimestamp(Pojo pojo, long l) {
> >>>>>>>     lastTimestamp = pojo.getTime();
> >>>>>>>     return pojo.getTime();
> >>>>>>> }
> >>>>>>>
> >>>>>>> @Override
> >>>>>>> public long extractWatermark(Pojo pojo, long l) {
> >>>>>>>     return Long.MIN_VALUE;
> >>>>>>> }
> >>>>>>>
> >>>>>>> @Override
> >>>>>>> public long getCurrentWatermark() {
> >>>>>>>     return lastTimestamp - maxDelay;
> >>>>>>> }
> >>>>>>>
> >>>>>>>
> >>>>>>> On 16.11.2015 13:37, Aljoscha Krettek wrote:
> >>>>>>>> Hi,
> >>>>>>>> yes, at your data-rate emitting a watermark for every element should 
> >>>>>>>> not be a problem. It could become a problem with higher data-rates 
> >>>>>>>> since the system can get overwhelmed if every element also generates 
> >>>>>>>> a watermark. In that case I would suggest storing the lastest 
> >>>>>>>> element-timestamp in an internal field and only emitting in 
> >>>>>>>> getCurrentWatermark(), since then, then the watermark interval can 
> >>>>>>>> be tunes using the auto-watermark interval setting.
> >>>>>>>>
> >>>>>>>> But that should not be the cause of the problem that you currently 
> >>>>>>>> have. Would you maybe be willing to send me some (mock) example data 
> >>>>>>>> and the code so that I can reproduce the problem and have a look at 
> >>>>>>>> it? to aljoscha at apache.org.
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Aljoscha
> >>>>>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf 
> >>>>>>>>> <konstantin.kn...@tngtech.com> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi Aljoscha,
> >>>>>>>>>
> >>>>>>>>> ok, now I at least understand, why it works with fromElements(...). 
> >>>>>>>>> For
> >>>>>>>>> the rest I am not so sure.
> >>>>>>>>>
> >>>>>>>>>> What this means in your case is that the watermark can only 
> >>>>>>>>>> advance if
> >>>>>>>>> a new element arrives, because only then is the watermark updated.
> >>>>>>>>>
> >>>>>>>>> But new elements arrive all the time, about 50/s, or do you mean
> >>>>>>>>> something else?
> >>>>>>>>>
> >>>>>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
> >>>>>>>>> choice, if i understand the semantics correctly. It just affects
> >>>>>>>>> watermarking in the absence of events, right?
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>>
> >>>>>>>>> Konstantin
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
> >>>>>>>>>> Hi,
> >>>>>>>>>> it could be what Gyula mentioned. Let me first go a bit into how 
> >>>>>>>>>> the TimestampExtractor works internally.
> >>>>>>>>>>
> >>>>>>>>>> First, the timestamp extractor internally keeps the value of the 
> >>>>>>>>>> last emitted watermark. Then, the semantics of the 
> >>>>>>>>>> TimestampExtractor are as follows :
> >>>>>>>>>> - the result of extractTimestamp is taken and it replaces the 
> >>>>>>>>>> internal timestamp of the element
> >>>>>>>>>> - if the result of extractWatermark is larger than the last 
> >>>>>>>>>> watermark the new value is emitted as a watermark and the value is 
> >>>>>>>>>> stored
> >>>>>>>>>> - getCurrentWatermark is called on the specified auto-watermark 
> >>>>>>>>>> interval, if the returned value is larger than the last watermark 
> >>>>>>>>>> it is emitted and stored as last watermark
> >>>>>>>>>>
> >>>>>>>>>> What this means in your case is that the watermark can only 
> >>>>>>>>>> advance if a new element arrives, because only then is the 
> >>>>>>>>>> watermark updated.
> >>>>>>>>>>
> >>>>>>>>>> The reason why you see results if you use fromElements is that the 
> >>>>>>>>>> window-operator also emits all the windows that it currently has 
> >>>>>>>>>> buffered if the program closes. This happens in the case of 
> >>>>>>>>>> fromElements because only a finite number of elements is emitted, 
> >>>>>>>>>> after which the source closes, thereby finishing the whole program.
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Aljoscha
> >>>>>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.f...@gmail.com> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Could this part of the extractor be the problem Aljoscha?
> >>>>>>>>>>>
> >>>>>>>>>>> @Override
> >>>>>>>>>>> public long getCurrentWatermark() {
> >>>>>>>>>>>    return Long.MIN_VALUE;
> >>>>>>>>>>> }
> >>>>>>>>>>>
> >>>>>>>>>>> Gyula
> >>>>>>>>>>>
> >>>>>>>>>>> Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta 
> >>>>>>>>>>> (időpont: 2015. nov. 16., H, 10:39):
> >>>>>>>>>>> Hi Aljoscha,
> >>>>>>>>>>>
> >>>>>>>>>>> thanks for your answer. Yes I am using the same 
> >>>>>>>>>>> TimestampExtractor-Class.
> >>>>>>>>>>>
> >>>>>>>>>>> The timestamps look good to me. Here an example.
> >>>>>>>>>>>
> >>>>>>>>>>> {"time": 1447666537260, ...} And parsed: 
> >>>>>>>>>>> 2015-11-16T10:35:37.260+01:00
> >>>>>>>>>>>
> >>>>>>>>>>> The order now is
> >>>>>>>>>>>
> >>>>>>>>>>> stream
> >>>>>>>>>>> .map(dummyMapper)
> >>>>>>>>>>> .assignTimestamps(...)
> >>>>>>>>>>> .timeWindow(...)
> >>>>>>>>>>>
> >>>>>>>>>>> Is there a way to print out the assigned timestamps after
> >>>>>>>>>>> stream.assignTimestamps(...)?
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>>
> >>>>>>>>>>> Konstantin
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
> >>>>>>>>>>>> Hi,
> >>>>>>>>>>>> are you also using the timestamp extractor when you are using 
> >>>>>>>>>>>> env.fromCollection().
> >>>>>>>>>>>>
> >>>>>>>>>>>> Could you maybe insert a dummy mapper after the Kafka source 
> >>>>>>>>>>>> that just prints the element and forwards it? To see if the 
> >>>>>>>>>>>> elements come with a good timestamp from Kafka.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Cheers,
> >>>>>>>>>>>> Aljoscha
> >>>>>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf 
> >>>>>>>>>>>>> <konstantin.kn...@tngtech.com> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I have the following issue with Flink (0.10) and Kafka.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I am using a very simple TimestampExtractor like [1], which just
> >>>>>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I 
> >>>>>>>>>>>>> read in
> >>>>>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like 
> >>>>>>>>>>>>> this:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
> >>>>>>>>>>>>> (parameterTool.getRequired("topic"),
> >>>>>>>>>>>>>           new AvroPojoDeserializationSchema(),
> >>>>>>>>>>>>> parameterTool.getProperties()))
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are 
> >>>>>>>>>>>>> EventTime,
> >>>>>>>>>>>>> AutoWatermarkIntervall is 500.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The problem is, when I do something like:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
> >>>>>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
> >>>>>>>>>>>>> .sum(..)
> >>>>>>>>>>>>> .print()
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> env.execute();
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> the windows never get triggered.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> If I use ProcessingTime it works.
> >>>>>>>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it 
> >>>>>>>>>>>>> works
> >>>>>>>>>>>>> with EventTime, too.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Any ideas what I could be doing wrong are highly appreciated.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Konstantin
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [1]:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> public class PojoTimestampExtractor implements 
> >>>>>>>>>>>>> TimestampExtractor<Pojo> {
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> final private long maxDelay;
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> public  PojoTimestampExtractor(long maxDelay) {
> >>>>>>>>>>>>>   this.maxDelay = maxDelay;
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> @Override
> >>>>>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) {
> >>>>>>>>>>>>>   return pojo.getTime();
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> @Override
> >>>>>>>>>>>>> public long extractWatermark(Pojo pojo, long l) {
> >>>>>>>>>>>>>   return pojo.getTime() - maxDelay;
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> @Override
> >>>>>>>>>>>>> public long getCurrentWatermark() {
> >>>>>>>>>>>>>   return Long.MIN_VALUE;
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> >>>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >>>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert 
> >>>>>>>>>>> Dahlke
> >>>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> >>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> >>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> >>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> >>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >>>>>>
> >>>>>>
> >>>>>
> >>>>> --
> >>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> >>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> >>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >>>>
> >>>>
> >>>
> >>
> >> --
> >> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> >> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >
> 

Reply via email to