Hi,

I think what we will need at some point for this are approximate
whatermarks which correlate event and ingest time.

I think they have similar concepts in Millwheel/Dataflow.

Cheers,
Gyula
On Mon, Nov 30, 2015 at 5:29 PM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> as an addition. I don’t have a solution yet, for the general problem of
> what happens when a parallel instance of a source never receives elements.
> This watermark business is very tricky...
>
> Cheers,
> Aljoscha
> > On 30 Nov 2015, at 17:20, Aljoscha Krettek <aljos...@apache.org> wrote:
> >
> > Hi Konstantin,
> > I finally nailed down the problem. :-)
> >
> > The basis of the problem is the fact that there is a mismatch in the
> parallelism of the Flink Kafka Consumer and the number of partitions in the
> Kafka Stream. I would assume that in your case the Kafka Stream has 1
> partition. This means, that only one of the parallel instances of the Flink
> Kafka Consumer ever receives element, which in turn means that only one of
> the parallel instances of the timestamp extractor ever receives elements.
> This means that no watermarks get emitted for the other parallel instances
> which in turn means that the watermark does not advance downstream because
> the watermark at an operator is the minimum over all upstream watermarks.
> This explains why ExampleTimestampExtractor1 only works in the case with
> parallelism=1.
> >
> > The reason why ExampleTimestampExtractor2 works in all parallelism
> settings is not very obvious. The secret is in this method:
> >
> > @Override
> > public long getCurrentWatermark() {
> >   return lastTimestamp - maxDelay;
> > }
> >
> > In the parallel instances that never receive any element lastTimestamp
> is set to Long.MIN_VALUE. So “lastTimestamp - maxDelay” is (Long.MAX_VALUE
> - maxDelay (+1)). Now, because the watermark at an operator is always the
> minimum over all watermarks from upstream operators the watermark at the
> window operator always tracks the watermark of the parallel instance that
> receives elements.
> >
> > I hope this helps, but please let me know if I should provide more
> explanation. This is a very tricky topic.
> >
> > Cheers,
> > Aljoscha
> >
> >> On 29 Nov 2015, at 21:18, Konstantin Knauf <
> konstantin.kn...@tngtech.com> wrote:
> >>
> >> Hi Aljoscha,
> >>
> >> I have put together a gist [1] with two classes, a short processing
> >> pipeline, which shows the behavior and a data generator to write records
> >> into Kafka. I hope I remembered everything we discussed correctly.
> >>
> >> So basically in the example it works with "TimestampExtractor1" only for
> >> parallelism 1, with "TimestampExtractor2" it works regardless of the
> >> parallelism. Run from the IDE.
> >>
> >> Let me know if you need anything else.
> >>
> >> Cheers,
> >>
> >> Konstantin
> >>
> >> [1] https://gist.github.com/knaufk/d57b5c3c7db576f3350d
> >>
> >> On 25.11.2015 21:15, Konstantin Knauf wrote:
> >>> Hi Aljoscha,
> >>>
> >>> sure, will do. I have neither found a solution. I won't have time to
> put
> >>> a minimal example together before the weekend though.
> >>>
> >>> Cheers,
> >>>
> >>> Konstantin
> >>>
> >>> On 25.11.2015 19:10, Aljoscha Krettek wrote:
> >>>> Hi Konstantin,
> >>>> I still didn’t come up with an explanation for the behavior. Could
> you maybe send me example code (and example data if it is necessary to
> reproduce the problem.)? This would really help me pinpoint the problem.
> >>>>
> >>>> Cheers,
> >>>> Aljoscha
> >>>>> On 17 Nov 2015, at 21:42, Konstantin Knauf <
> konstantin.kn...@tngtech.com> wrote:
> >>>>>
> >>>>> Hi Aljoscha,
> >>>>>
> >>>>> Are you sure? I am running the job from my IDE at the moment.
> >>>>>
> >>>>> If I set
> >>>>>
> >>>>> StreamExecutionEnvironment.setParallelism(1);
> >>>>>
> >>>>> I works with the old TimestampExtractor (returning Long.MIN_VALUE
> from
> >>>>> getCurrentWatermark() and emitting a watermark at every record)
> >>>>>
> >>>>> If I set
> >>>>>
> >>>>> StreamExecutionEnvironment.setParallelism(5);
> >>>>>
> >>>>> it does not work.
> >>>>>
> >>>>> So, if I understood you correctly, it is the opposite of what you
> were
> >>>>> expecting?!
> >>>>>
> >>>>> Cheers,
> >>>>>
> >>>>> Konstantin
> >>>>>
> >>>>>
> >>>>> On 17.11.2015 11:32, Aljoscha Krettek wrote:
> >>>>>> Hi,
> >>>>>> actually, the bug is more subtle. Normally, it is not a problem
> that the TimestampExtractor sometimes emits a watermark that is lower than
> the one before. (This is the result of the bug with Long.MIN_VALUE I
> mentioned before). The stream operators wait for watermarks from all
> upstream operators and only advance the watermark monotonically in lockstep
> with them. This way, the watermark cannot decrease at an operator.
> >>>>>>
> >>>>>> In your case, you have a topology with parallelism 1, I assume. In
> that case the operators are chained. (There is no separate operators but
> basically only one operator and element transmission happens in function
> calls). In this setting the watermarks are directly forwarded to operators
> without going through the logic I mentioned above.
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Aljoscha
> >>>>>>> On 16 Nov 2015, at 18:13, Konstantin Knauf <
> konstantin.kn...@tngtech.com> wrote:
> >>>>>>>
> >>>>>>> Hi Aljoscha,
> >>>>>>>
> >>>>>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp
> and only
> >>>>>>> emit with getCurrentWatermark [1] as you suggested. So basically I
> do
> >>>>>>> the opposite than before (only watermarks per events vs only
> watermarks
> >>>>>>> per autowatermark). And now it works :). The question remains, why
> it
> >>>>>>> did not work before. As far as I see, it is an issue with the first
> >>>>>>> TimestmapExtractor itself?!
> >>>>>>>
> >>>>>>> Does getCurrentWatermark(..) somehow "overpower" the extracted
> watermarks?
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>>
> >>>>>>> Konstantin
> >>>>>>>
> >>>>>>> [1]
> >>>>>>>
> >>>>>>> final private long maxDelay;
> >>>>>>> private long lastTimestamp = Long.MIN_VALUE;
> >>>>>>>
> >>>>>>> public PojoTimestampExtractor(long maxDelay) {
> >>>>>>>     this.maxDelay = maxDelay;
> >>>>>>> }
> >>>>>>>
> >>>>>>> @Override
> >>>>>>> public long extractTimestamp(Pojo pojo, long l) {
> >>>>>>>     lastTimestamp = pojo.getTime();
> >>>>>>>     return pojo.getTime();
> >>>>>>> }
> >>>>>>>
> >>>>>>> @Override
> >>>>>>> public long extractWatermark(Pojo pojo, long l) {
> >>>>>>>     return Long.MIN_VALUE;
> >>>>>>> }
> >>>>>>>
> >>>>>>> @Override
> >>>>>>> public long getCurrentWatermark() {
> >>>>>>>     return lastTimestamp - maxDelay;
> >>>>>>> }
> >>>>>>>
> >>>>>>>
> >>>>>>> On 16.11.2015 13:37, Aljoscha Krettek wrote:
> >>>>>>>> Hi,
> >>>>>>>> yes, at your data-rate emitting a watermark for every element
> should not be a problem. It could become a problem with higher data-rates
> since the system can get overwhelmed if every element also generates a
> watermark. In that case I would suggest storing the lastest
> element-timestamp in an internal field and only emitting in
> getCurrentWatermark(), since then, then the watermark interval can be tunes
> using the auto-watermark interval setting.
> >>>>>>>>
> >>>>>>>> But that should not be the cause of the problem that you
> currently have. Would you maybe be willing to send me some (mock) example
> data and the code so that I can reproduce the problem and have a look at
> it? to aljoscha at apache.org.
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Aljoscha
> >>>>>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf <
> konstantin.kn...@tngtech.com> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi Aljoscha,
> >>>>>>>>>
> >>>>>>>>> ok, now I at least understand, why it works with
> fromElements(...). For
> >>>>>>>>> the rest I am not so sure.
> >>>>>>>>>
> >>>>>>>>>> What this means in your case is that the watermark can only
> advance if
> >>>>>>>>> a new element arrives, because only then is the watermark
> updated.
> >>>>>>>>>
> >>>>>>>>> But new elements arrive all the time, about 50/s, or do you mean
> >>>>>>>>> something else?
> >>>>>>>>>
> >>>>>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be
> an ok
> >>>>>>>>> choice, if i understand the semantics correctly. It just affects
> >>>>>>>>> watermarking in the absence of events, right?
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>>
> >>>>>>>>> Konstantin
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
> >>>>>>>>>> Hi,
> >>>>>>>>>> it could be what Gyula mentioned. Let me first go a bit into
> how the TimestampExtractor works internally.
> >>>>>>>>>>
> >>>>>>>>>> First, the timestamp extractor internally keeps the value of
> the last emitted watermark. Then, the semantics of the TimestampExtractor
> are as follows :
> >>>>>>>>>> - the result of extractTimestamp is taken and it replaces the
> internal timestamp of the element
> >>>>>>>>>> - if the result of extractWatermark is larger than the last
> watermark the new value is emitted as a watermark and the value is stored
> >>>>>>>>>> - getCurrentWatermark is called on the specified auto-watermark
> interval, if the returned value is larger than the last watermark it is
> emitted and stored as last watermark
> >>>>>>>>>>
> >>>>>>>>>> What this means in your case is that the watermark can only
> advance if a new element arrives, because only then is the watermark
> updated.
> >>>>>>>>>>
> >>>>>>>>>> The reason why you see results if you use fromElements is that
> the window-operator also emits all the windows that it currently has
> buffered if the program closes. This happens in the case of fromElements
> because only a finite number of elements is emitted, after which the source
> closes, thereby finishing the whole program.
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Aljoscha
> >>>>>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.f...@gmail.com>
> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Could this part of the extractor be the problem Aljoscha?
> >>>>>>>>>>>
> >>>>>>>>>>> @Override
> >>>>>>>>>>> public long getCurrentWatermark() {
> >>>>>>>>>>>    return Long.MIN_VALUE;
> >>>>>>>>>>> }
> >>>>>>>>>>>
> >>>>>>>>>>> Gyula
> >>>>>>>>>>>
> >>>>>>>>>>> Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta
> (időpont: 2015. nov. 16., H, 10:39):
> >>>>>>>>>>> Hi Aljoscha,
> >>>>>>>>>>>
> >>>>>>>>>>> thanks for your answer. Yes I am using the same
> TimestampExtractor-Class.
> >>>>>>>>>>>
> >>>>>>>>>>> The timestamps look good to me. Here an example.
> >>>>>>>>>>>
> >>>>>>>>>>> {"time": 1447666537260, ...} And parsed:
> 2015-11-16T10:35:37.260+01:00
> >>>>>>>>>>>
> >>>>>>>>>>> The order now is
> >>>>>>>>>>>
> >>>>>>>>>>> stream
> >>>>>>>>>>> .map(dummyMapper)
> >>>>>>>>>>> .assignTimestamps(...)
> >>>>>>>>>>> .timeWindow(...)
> >>>>>>>>>>>
> >>>>>>>>>>> Is there a way to print out the assigned timestamps after
> >>>>>>>>>>> stream.assignTimestamps(...)?
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>>
> >>>>>>>>>>> Konstantin
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
> >>>>>>>>>>>> Hi,
> >>>>>>>>>>>> are you also using the timestamp extractor when you are using
> env.fromCollection().
> >>>>>>>>>>>>
> >>>>>>>>>>>> Could you maybe insert a dummy mapper after the Kafka source
> that just prints the element and forwards it? To see if the elements come
> with a good timestamp from Kafka.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Cheers,
> >>>>>>>>>>>> Aljoscha
> >>>>>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf <
> konstantin.kn...@tngtech.com> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I have the following issue with Flink (0.10) and Kafka.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I am using a very simple TimestampExtractor like [1], which
> just
> >>>>>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming
> job, I read in
> >>>>>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like
> this:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
> >>>>>>>>>>>>> (parameterTool.getRequired("topic"),
> >>>>>>>>>>>>>           new AvroPojoDeserializationSchema(),
> >>>>>>>>>>>>> parameterTool.getProperties()))
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are
> EventTime,
> >>>>>>>>>>>>> AutoWatermarkIntervall is 500.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The problem is, when I do something like:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
> >>>>>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
> >>>>>>>>>>>>> .sum(..)
> >>>>>>>>>>>>> .print()
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> env.execute();
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> the windows never get triggered.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> If I use ProcessingTime it works.
> >>>>>>>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource
> it works
> >>>>>>>>>>>>> with EventTime, too.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Any ideas what I could be doing wrong are highly appreciated.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Konstantin
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [1]:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> public class PojoTimestampExtractor implements
> TimestampExtractor<Pojo> {
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> final private long maxDelay;
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> public  PojoTimestampExtractor(long maxDelay) {
> >>>>>>>>>>>>>   this.maxDelay = maxDelay;
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> @Override
> >>>>>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) {
> >>>>>>>>>>>>>   return pojo.getTime();
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> @Override
> >>>>>>>>>>>>> public long extractWatermark(Pojo pojo, long l) {
> >>>>>>>>>>>>>   return pojo.getTime() - maxDelay;
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> @Override
> >>>>>>>>>>>>> public long getCurrentWatermark() {
> >>>>>>>>>>>>>   return Long.MIN_VALUE;
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com *
> +49-174-3413182
> >>>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774
> Unterföhring
> >>>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert
> Dahlke
> >>>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com *
> +49-174-3413182
> >>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert
> Dahlke
> >>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> >>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> >>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >>>>>>
> >>>>>>
> >>>>>
> >>>>> --
> >>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> >>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> >>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >>>>
> >>>>
> >>>
> >>
> >> --
> >> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> >> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >
>
>

Reply via email to