The PRs are merged. :D
> On 11 Dec 2015, at 17:28, Stephan Ewen <se...@apache.org> wrote:
>
> A solution for that is in these two pull requests:
>
> https://github.com/apache/flink/pull/1447
>
> https://github.com/apache/flink/pull/1448
>
> On Fri, Dec 4, 2015 at 10:21 PM, Robert Metzger <rmetz...@apache.org> wrote:
> I think we need to find a solution for this problem soon.
> Another user is most likely affected:
> http://stackoverflow.com/q/34090808/568695
>
> I've filed a JIRA for the problem:
> https://issues.apache.org/jira/browse/FLINK-3121
>
>
> On Mon, Nov 30, 2015 at 5:58 PM, Aljoscha Krettek <aljos...@apache.org> wrote:
> Maybe. In the Kafka case we just need to ensure that parallel instances of
> the source that know that they don’t have any partitions assigned to them
> emit Long.MAX_VALUE as a watermark.
>
> > On 30 Nov 2015, at 17:50, Gyula Fóra <gyula.f...@gmail.com> wrote:
> >
> > Hi,
> >
> > I think what we will need at some point for this are approximate
> > whatermarks which correlate event and ingest time.
> >
> > I think they have similar concepts in Millwheel/Dataflow.
> >
> > Cheers,
> > Gyula
> > On Mon, Nov 30, 2015 at 5:29 PM Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> > Hi,
> > as an addition. I don’t have a solution yet, for the general problem of
> > what happens when a parallel instance of a source never receives elements.
> > This watermark business is very tricky...
> >
> > Cheers,
> > Aljoscha
> > > On 30 Nov 2015, at 17:20, Aljoscha Krettek <aljos...@apache.org> wrote:
> > >
> > > Hi Konstantin,
> > > I finally nailed down the problem. :-)
> > >
> > > The basis of the problem is the fact that there is a mismatch in the
> > > parallelism of the Flink Kafka Consumer and the number of partitions in
> > > the Kafka Stream. I would assume that in your case the Kafka Stream has 1
> > > partition. This means, that only one of the parallel instances of the
> > > Flink Kafka Consumer ever receives element, which in turn means that only
> > > one of the parallel instances of the timestamp extractor ever receives
> > > elements. This means that no watermarks get emitted for the other
> > > parallel instances which in turn means that the watermark does not
> > > advance downstream because the watermark at an operator is the minimum
> > > over all upstream watermarks. This explains why
> > > ExampleTimestampExtractor1 only works in the case with parallelism=1.
> > >
> > > The reason why ExampleTimestampExtractor2 works in all parallelism
> > > settings is not very obvious. The secret is in this method:
> > >
> > > @Override
> > > public long getCurrentWatermark() {
> > > return lastTimestamp - maxDelay;
> > > }
> > >
> > > In the parallel instances that never receive any element lastTimestamp is
> > > set to Long.MIN_VALUE. So “lastTimestamp - maxDelay” is (Long.MAX_VALUE -
> > > maxDelay (+1)). Now, because the watermark at an operator is always the
> > > minimum over all watermarks from upstream operators the watermark at the
> > > window operator always tracks the watermark of the parallel instance that
> > > receives elements.
> > >
> > > I hope this helps, but please let me know if I should provide more
> > > explanation. This is a very tricky topic.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > >> On 29 Nov 2015, at 21:18, Konstantin Knauf
> > >> <konstantin.kn...@tngtech.com> wrote:
> > >>
> > >> Hi Aljoscha,
> > >>
> > >> I have put together a gist [1] with two classes, a short processing
> > >> pipeline, which shows the behavior and a data generator to write records
> > >> into Kafka. I hope I remembered everything we discussed correctly.
> > >>
> > >> So basically in the example it works with "TimestampExtractor1" only for
> > >> parallelism 1, with "TimestampExtractor2" it works regardless of the
> > >> parallelism. Run from the IDE.
> > >>
> > >> Let me know if you need anything else.
> > >>
> > >> Cheers,
> > >>
> > >> Konstantin
> > >>
> > >> [1] https://gist.github.com/knaufk/d57b5c3c7db576f3350d
> > >>
> > >> On 25.11.2015 21:15, Konstantin Knauf wrote:
> > >>> Hi Aljoscha,
> > >>>
> > >>> sure, will do. I have neither found a solution. I won't have time to put
> > >>> a minimal example together before the weekend though.
> > >>>
> > >>> Cheers,
> > >>>
> > >>> Konstantin
> > >>>
> > >>> On 25.11.2015 19:10, Aljoscha Krettek wrote:
> > >>>> Hi Konstantin,
> > >>>> I still didn’t come up with an explanation for the behavior. Could you
> > >>>> maybe send me example code (and example data if it is necessary to
> > >>>> reproduce the problem.)? This would really help me pinpoint the
> > >>>> problem.
> > >>>>
> > >>>> Cheers,
> > >>>> Aljoscha
> > >>>>> On 17 Nov 2015, at 21:42, Konstantin Knauf
> > >>>>> <konstantin.kn...@tngtech.com> wrote:
> > >>>>>
> > >>>>> Hi Aljoscha,
> > >>>>>
> > >>>>> Are you sure? I am running the job from my IDE at the moment.
> > >>>>>
> > >>>>> If I set
> > >>>>>
> > >>>>> StreamExecutionEnvironment.setParallelism(1);
> > >>>>>
> > >>>>> I works with the old TimestampExtractor (returning Long.MIN_VALUE from
> > >>>>> getCurrentWatermark() and emitting a watermark at every record)
> > >>>>>
> > >>>>> If I set
> > >>>>>
> > >>>>> StreamExecutionEnvironment.setParallelism(5);
> > >>>>>
> > >>>>> it does not work.
> > >>>>>
> > >>>>> So, if I understood you correctly, it is the opposite of what you were
> > >>>>> expecting?!
> > >>>>>
> > >>>>> Cheers,
> > >>>>>
> > >>>>> Konstantin
> > >>>>>
> > >>>>>
> > >>>>> On 17.11.2015 11:32, Aljoscha Krettek wrote:
> > >>>>>> Hi,
> > >>>>>> actually, the bug is more subtle. Normally, it is not a problem that
> > >>>>>> the TimestampExtractor sometimes emits a watermark that is lower
> > >>>>>> than the one before. (This is the result of the bug with
> > >>>>>> Long.MIN_VALUE I mentioned before). The stream operators wait for
> > >>>>>> watermarks from all upstream operators and only advance the
> > >>>>>> watermark monotonically in lockstep with them. This way, the
> > >>>>>> watermark cannot decrease at an operator.
> > >>>>>>
> > >>>>>> In your case, you have a topology with parallelism 1, I assume. In
> > >>>>>> that case the operators are chained. (There is no separate operators
> > >>>>>> but basically only one operator and element transmission happens in
> > >>>>>> function calls). In this setting the watermarks are directly
> > >>>>>> forwarded to operators without going through the logic I mentioned
> > >>>>>> above.
> > >>>>>>
> > >>>>>> Cheers,
> > >>>>>> Aljoscha
> > >>>>>>> On 16 Nov 2015, at 18:13, Konstantin Knauf
> > >>>>>>> <konstantin.kn...@tngtech.com> wrote:
> > >>>>>>>
> > >>>>>>> Hi Aljoscha,
> > >>>>>>>
> > >>>>>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp and
> > >>>>>>> only
> > >>>>>>> emit with getCurrentWatermark [1] as you suggested. So basically I
> > >>>>>>> do
> > >>>>>>> the opposite than before (only watermarks per events vs only
> > >>>>>>> watermarks
> > >>>>>>> per autowatermark). And now it works :). The question remains, why
> > >>>>>>> it
> > >>>>>>> did not work before. As far as I see, it is an issue with the first
> > >>>>>>> TimestmapExtractor itself?!
> > >>>>>>>
> > >>>>>>> Does getCurrentWatermark(..) somehow "overpower" the extracted
> > >>>>>>> watermarks?
> > >>>>>>>
> > >>>>>>> Cheers,
> > >>>>>>>
> > >>>>>>> Konstantin
> > >>>>>>>
> > >>>>>>> [1]
> > >>>>>>>
> > >>>>>>> final private long maxDelay;
> > >>>>>>> private long lastTimestamp = Long.MIN_VALUE;
> > >>>>>>>
> > >>>>>>> public PojoTimestampExtractor(long maxDelay) {
> > >>>>>>> this.maxDelay = maxDelay;
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>> @Override
> > >>>>>>> public long extractTimestamp(Pojo pojo, long l) {
> > >>>>>>> lastTimestamp = pojo.getTime();
> > >>>>>>> return pojo.getTime();
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>> @Override
> > >>>>>>> public long extractWatermark(Pojo pojo, long l) {
> > >>>>>>> return Long.MIN_VALUE;
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>> @Override
> > >>>>>>> public long getCurrentWatermark() {
> > >>>>>>> return lastTimestamp - maxDelay;
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On 16.11.2015 13:37, Aljoscha Krettek wrote:
> > >>>>>>>> Hi,
> > >>>>>>>> yes, at your data-rate emitting a watermark for every element
> > >>>>>>>> should not be a problem. It could become a problem with higher
> > >>>>>>>> data-rates since the system can get overwhelmed if every element
> > >>>>>>>> also generates a watermark. In that case I would suggest storing
> > >>>>>>>> the lastest element-timestamp in an internal field and only
> > >>>>>>>> emitting in getCurrentWatermark(), since then, then the watermark
> > >>>>>>>> interval can be tunes using the auto-watermark interval setting.
> > >>>>>>>>
> > >>>>>>>> But that should not be the cause of the problem that you currently
> > >>>>>>>> have. Would you maybe be willing to send me some (mock) example
> > >>>>>>>> data and the code so that I can reproduce the problem and have a
> > >>>>>>>> look at it? to aljoscha at apache.org.
> > >>>>>>>>
> > >>>>>>>> Cheers,
> > >>>>>>>> Aljoscha
> > >>>>>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf
> > >>>>>>>>> <konstantin.kn...@tngtech.com> wrote:
> > >>>>>>>>>
> > >>>>>>>>> Hi Aljoscha,
> > >>>>>>>>>
> > >>>>>>>>> ok, now I at least understand, why it works with
> > >>>>>>>>> fromElements(...). For
> > >>>>>>>>> the rest I am not so sure.
> > >>>>>>>>>
> > >>>>>>>>>> What this means in your case is that the watermark can only
> > >>>>>>>>>> advance if
> > >>>>>>>>> a new element arrives, because only then is the watermark updated.
> > >>>>>>>>>
> > >>>>>>>>> But new elements arrive all the time, about 50/s, or do you mean
> > >>>>>>>>> something else?
> > >>>>>>>>>
> > >>>>>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an
> > >>>>>>>>> ok
> > >>>>>>>>> choice, if i understand the semantics correctly. It just affects
> > >>>>>>>>> watermarking in the absence of events, right?
> > >>>>>>>>>
> > >>>>>>>>> Cheers,
> > >>>>>>>>>
> > >>>>>>>>> Konstantin
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
> > >>>>>>>>>> Hi,
> > >>>>>>>>>> it could be what Gyula mentioned. Let me first go a bit into how
> > >>>>>>>>>> the TimestampExtractor works internally.
> > >>>>>>>>>>
> > >>>>>>>>>> First, the timestamp extractor internally keeps the value of the
> > >>>>>>>>>> last emitted watermark. Then, the semantics of the
> > >>>>>>>>>> TimestampExtractor are as follows :
> > >>>>>>>>>> - the result of extractTimestamp is taken and it replaces the
> > >>>>>>>>>> internal timestamp of the element
> > >>>>>>>>>> - if the result of extractWatermark is larger than the last
> > >>>>>>>>>> watermark the new value is emitted as a watermark and the value
> > >>>>>>>>>> is stored
> > >>>>>>>>>> - getCurrentWatermark is called on the specified auto-watermark
> > >>>>>>>>>> interval, if the returned value is larger than the last
> > >>>>>>>>>> watermark it is emitted and stored as last watermark
> > >>>>>>>>>>
> > >>>>>>>>>> What this means in your case is that the watermark can only
> > >>>>>>>>>> advance if a new element arrives, because only then is the
> > >>>>>>>>>> watermark updated.
> > >>>>>>>>>>
> > >>>>>>>>>> The reason why you see results if you use fromElements is that
> > >>>>>>>>>> the window-operator also emits all the windows that it currently
> > >>>>>>>>>> has buffered if the program closes. This happens in the case of
> > >>>>>>>>>> fromElements because only a finite number of elements is
> > >>>>>>>>>> emitted, after which the source closes, thereby finishing the
> > >>>>>>>>>> whole program.
> > >>>>>>>>>>
> > >>>>>>>>>> Cheers,
> > >>>>>>>>>> Aljoscha
> > >>>>>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.f...@gmail.com>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>> Could this part of the extractor be the problem Aljoscha?
> > >>>>>>>>>>>
> > >>>>>>>>>>> @Override
> > >>>>>>>>>>> public long getCurrentWatermark() {
> > >>>>>>>>>>> return Long.MIN_VALUE;
> > >>>>>>>>>>> }
> > >>>>>>>>>>>
> > >>>>>>>>>>> Gyula
> > >>>>>>>>>>>
> > >>>>>>>>>>> Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta
> > >>>>>>>>>>> (időpont: 2015. nov. 16., H, 10:39):
> > >>>>>>>>>>> Hi Aljoscha,
> > >>>>>>>>>>>
> > >>>>>>>>>>> thanks for your answer. Yes I am using the same
> > >>>>>>>>>>> TimestampExtractor-Class.
> > >>>>>>>>>>>
> > >>>>>>>>>>> The timestamps look good to me. Here an example.
> > >>>>>>>>>>>
> > >>>>>>>>>>> {"time": 1447666537260, ...} And parsed:
> > >>>>>>>>>>> 2015-11-16T10:35:37.260+01:00
> > >>>>>>>>>>>
> > >>>>>>>>>>> The order now is
> > >>>>>>>>>>>
> > >>>>>>>>>>> stream
> > >>>>>>>>>>> .map(dummyMapper)
> > >>>>>>>>>>> .assignTimestamps(...)
> > >>>>>>>>>>> .timeWindow(...)
> > >>>>>>>>>>>
> > >>>>>>>>>>> Is there a way to print out the assigned timestamps after
> > >>>>>>>>>>> stream.assignTimestamps(...)?
> > >>>>>>>>>>>
> > >>>>>>>>>>> Cheers,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Konstantin
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
> > >>>>>>>>>>>> Hi,
> > >>>>>>>>>>>> are you also using the timestamp extractor when you are using
> > >>>>>>>>>>>> env.fromCollection().
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Could you maybe insert a dummy mapper after the Kafka source
> > >>>>>>>>>>>> that just prints the element and forwards it? To see if the
> > >>>>>>>>>>>> elements come with a good timestamp from Kafka.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>> Aljoscha
> > >>>>>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf
> > >>>>>>>>>>>>> <konstantin.kn...@tngtech.com> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Hi everyone,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I have the following issue with Flink (0.10) and Kafka.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I am using a very simple TimestampExtractor like [1], which
> > >>>>>>>>>>>>> just
> > >>>>>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming job,
> > >>>>>>>>>>>>> I read in
> > >>>>>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like
> > >>>>>>>>>>>>> this:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
> > >>>>>>>>>>>>> (parameterTool.getRequired("topic"),
> > >>>>>>>>>>>>> new AvroPojoDeserializationSchema(),
> > >>>>>>>>>>>>> parameterTool.getProperties()))
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are
> > >>>>>>>>>>>>> EventTime,
> > >>>>>>>>>>>>> AutoWatermarkIntervall is 500.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> The problem is, when I do something like:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
> > >>>>>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
> > >>>>>>>>>>>>> .sum(..)
> > >>>>>>>>>>>>> .print()
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> env.execute();
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> the windows never get triggered.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> If I use ProcessingTime it works.
> > >>>>>>>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource
> > >>>>>>>>>>>>> it works
> > >>>>>>>>>>>>> with EventTime, too.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Any ideas what I could be doing wrong are highly appreciated.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Konstantin
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> [1]:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> public class PojoTimestampExtractor implements
> > >>>>>>>>>>>>> TimestampExtractor<Pojo> {
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> final private long maxDelay;
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> public PojoTimestampExtractor(long maxDelay) {
> > >>>>>>>>>>>>> this.maxDelay = maxDelay;
> > >>>>>>>>>>>>> }
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> @Override
> > >>>>>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) {
> > >>>>>>>>>>>>> return pojo.getTime();
> > >>>>>>>>>>>>> }
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> @Override
> > >>>>>>>>>>>>> public long extractWatermark(Pojo pojo, long l) {
> > >>>>>>>>>>>>> return pojo.getTime() - maxDelay;
> > >>>>>>>>>>>>> }
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> @Override
> > >>>>>>>>>>>>> public long getCurrentWatermark() {
> > >>>>>>>>>>>>> return Long.MIN_VALUE;
> > >>>>>>>>>>>>> }
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> --
> > >>>>>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com *
> > >>>>>>>>>>> +49-174-3413182
> > >>>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> > >>>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert
> > >>>>>>>>>>> Dahlke
> > >>>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> --
> > >>>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> > >>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> > >>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert
> > >>>>>>>>> Dahlke
> > >>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>> --
> > >>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> > >>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> > >>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> > >>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>> --
> > >>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> > >>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> > >>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> > >>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> > >>>>
> > >>>>
> > >>>
> > >>
> > >> --
> > >> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> > >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> > >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> > >> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> > >
> >
>
>
>