A solution for that is in these two pull requests:

https://github.com/apache/flink/pull/1447

https://github.com/apache/flink/pull/1448

On Fri, Dec 4, 2015 at 10:21 PM, Robert Metzger <rmetz...@apache.org> wrote:

> I think we need to find a solution for this problem soon.
> Another user is most likely affected:
> http://stackoverflow.com/q/34090808/568695
>
> I've filed a JIRA for the problem:
> https://issues.apache.org/jira/browse/FLINK-3121
>
>
> On Mon, Nov 30, 2015 at 5:58 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Maybe. In the Kafka case we just need to ensure that parallel instances
>> of the source that know that they don’t have any partitions assigned to
>> them emit Long.MAX_VALUE as a watermark.
>>
>> > On 30 Nov 2015, at 17:50, Gyula Fóra <gyula.f...@gmail.com> wrote:
>> >
>> > Hi,
>> >
>> > I think what we will need at some point for this are approximate
>> whatermarks which correlate event and ingest time.
>> >
>> > I think they have similar concepts in Millwheel/Dataflow.
>> >
>> > Cheers,
>> > Gyula
>> > On Mon, Nov 30, 2015 at 5:29 PM Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>> > Hi,
>> > as an addition. I don’t have a solution yet, for the general problem of
>> what happens when a parallel instance of a source never receives elements.
>> This watermark business is very tricky...
>> >
>> > Cheers,
>> > Aljoscha
>> > > On 30 Nov 2015, at 17:20, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>> > >
>> > > Hi Konstantin,
>> > > I finally nailed down the problem. :-)
>> > >
>> > > The basis of the problem is the fact that there is a mismatch in the
>> parallelism of the Flink Kafka Consumer and the number of partitions in the
>> Kafka Stream. I would assume that in your case the Kafka Stream has 1
>> partition. This means, that only one of the parallel instances of the Flink
>> Kafka Consumer ever receives element, which in turn means that only one of
>> the parallel instances of the timestamp extractor ever receives elements.
>> This means that no watermarks get emitted for the other parallel instances
>> which in turn means that the watermark does not advance downstream because
>> the watermark at an operator is the minimum over all upstream watermarks.
>> This explains why ExampleTimestampExtractor1 only works in the case with
>> parallelism=1.
>> > >
>> > > The reason why ExampleTimestampExtractor2 works in all parallelism
>> settings is not very obvious. The secret is in this method:
>> > >
>> > > @Override
>> > > public long getCurrentWatermark() {
>> > >   return lastTimestamp - maxDelay;
>> > > }
>> > >
>> > > In the parallel instances that never receive any element
>> lastTimestamp is set to Long.MIN_VALUE. So “lastTimestamp - maxDelay” is
>> (Long.MAX_VALUE - maxDelay (+1)). Now, because the watermark at an operator
>> is always the minimum over all watermarks from upstream operators the
>> watermark at the window operator always tracks the watermark of the
>> parallel instance that receives elements.
>> > >
>> > > I hope this helps, but please let me know if I should provide more
>> explanation. This is a very tricky topic.
>> > >
>> > > Cheers,
>> > > Aljoscha
>> > >
>> > >> On 29 Nov 2015, at 21:18, Konstantin Knauf <
>> konstantin.kn...@tngtech.com> wrote:
>> > >>
>> > >> Hi Aljoscha,
>> > >>
>> > >> I have put together a gist [1] with two classes, a short processing
>> > >> pipeline, which shows the behavior and a data generator to write
>> records
>> > >> into Kafka. I hope I remembered everything we discussed correctly.
>> > >>
>> > >> So basically in the example it works with "TimestampExtractor1" only
>> for
>> > >> parallelism 1, with "TimestampExtractor2" it works regardless of the
>> > >> parallelism. Run from the IDE.
>> > >>
>> > >> Let me know if you need anything else.
>> > >>
>> > >> Cheers,
>> > >>
>> > >> Konstantin
>> > >>
>> > >> [1] https://gist.github.com/knaufk/d57b5c3c7db576f3350d
>> > >>
>> > >> On 25.11.2015 21:15, Konstantin Knauf wrote:
>> > >>> Hi Aljoscha,
>> > >>>
>> > >>> sure, will do. I have neither found a solution. I won't have time
>> to put
>> > >>> a minimal example together before the weekend though.
>> > >>>
>> > >>> Cheers,
>> > >>>
>> > >>> Konstantin
>> > >>>
>> > >>> On 25.11.2015 19:10, Aljoscha Krettek wrote:
>> > >>>> Hi Konstantin,
>> > >>>> I still didn’t come up with an explanation for the behavior. Could
>> you maybe send me example code (and example data if it is necessary to
>> reproduce the problem.)? This would really help me pinpoint the problem.
>> > >>>>
>> > >>>> Cheers,
>> > >>>> Aljoscha
>> > >>>>> On 17 Nov 2015, at 21:42, Konstantin Knauf <
>> konstantin.kn...@tngtech.com> wrote:
>> > >>>>>
>> > >>>>> Hi Aljoscha,
>> > >>>>>
>> > >>>>> Are you sure? I am running the job from my IDE at the moment.
>> > >>>>>
>> > >>>>> If I set
>> > >>>>>
>> > >>>>> StreamExecutionEnvironment.setParallelism(1);
>> > >>>>>
>> > >>>>> I works with the old TimestampExtractor (returning Long.MIN_VALUE
>> from
>> > >>>>> getCurrentWatermark() and emitting a watermark at every record)
>> > >>>>>
>> > >>>>> If I set
>> > >>>>>
>> > >>>>> StreamExecutionEnvironment.setParallelism(5);
>> > >>>>>
>> > >>>>> it does not work.
>> > >>>>>
>> > >>>>> So, if I understood you correctly, it is the opposite of what you
>> were
>> > >>>>> expecting?!
>> > >>>>>
>> > >>>>> Cheers,
>> > >>>>>
>> > >>>>> Konstantin
>> > >>>>>
>> > >>>>>
>> > >>>>> On 17.11.2015 11:32, Aljoscha Krettek wrote:
>> > >>>>>> Hi,
>> > >>>>>> actually, the bug is more subtle. Normally, it is not a problem
>> that the TimestampExtractor sometimes emits a watermark that is lower than
>> the one before. (This is the result of the bug with Long.MIN_VALUE I
>> mentioned before). The stream operators wait for watermarks from all
>> upstream operators and only advance the watermark monotonically in lockstep
>> with them. This way, the watermark cannot decrease at an operator.
>> > >>>>>>
>> > >>>>>> In your case, you have a topology with parallelism 1, I assume.
>> In that case the operators are chained. (There is no separate operators but
>> basically only one operator and element transmission happens in function
>> calls). In this setting the watermarks are directly forwarded to operators
>> without going through the logic I mentioned above.
>> > >>>>>>
>> > >>>>>> Cheers,
>> > >>>>>> Aljoscha
>> > >>>>>>> On 16 Nov 2015, at 18:13, Konstantin Knauf <
>> konstantin.kn...@tngtech.com> wrote:
>> > >>>>>>>
>> > >>>>>>> Hi Aljoscha,
>> > >>>>>>>
>> > >>>>>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp
>> and only
>> > >>>>>>> emit with getCurrentWatermark [1] as you suggested. So
>> basically I do
>> > >>>>>>> the opposite than before (only watermarks per events vs only
>> watermarks
>> > >>>>>>> per autowatermark). And now it works :). The question remains,
>> why it
>> > >>>>>>> did not work before. As far as I see, it is an issue with the
>> first
>> > >>>>>>> TimestmapExtractor itself?!
>> > >>>>>>>
>> > >>>>>>> Does getCurrentWatermark(..) somehow "overpower" the extracted
>> watermarks?
>> > >>>>>>>
>> > >>>>>>> Cheers,
>> > >>>>>>>
>> > >>>>>>> Konstantin
>> > >>>>>>>
>> > >>>>>>> [1]
>> > >>>>>>>
>> > >>>>>>> final private long maxDelay;
>> > >>>>>>> private long lastTimestamp = Long.MIN_VALUE;
>> > >>>>>>>
>> > >>>>>>> public PojoTimestampExtractor(long maxDelay) {
>> > >>>>>>>     this.maxDelay = maxDelay;
>> > >>>>>>> }
>> > >>>>>>>
>> > >>>>>>> @Override
>> > >>>>>>> public long extractTimestamp(Pojo pojo, long l) {
>> > >>>>>>>     lastTimestamp = pojo.getTime();
>> > >>>>>>>     return pojo.getTime();
>> > >>>>>>> }
>> > >>>>>>>
>> > >>>>>>> @Override
>> > >>>>>>> public long extractWatermark(Pojo pojo, long l) {
>> > >>>>>>>     return Long.MIN_VALUE;
>> > >>>>>>> }
>> > >>>>>>>
>> > >>>>>>> @Override
>> > >>>>>>> public long getCurrentWatermark() {
>> > >>>>>>>     return lastTimestamp - maxDelay;
>> > >>>>>>> }
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>> On 16.11.2015 13:37, Aljoscha Krettek wrote:
>> > >>>>>>>> Hi,
>> > >>>>>>>> yes, at your data-rate emitting a watermark for every element
>> should not be a problem. It could become a problem with higher data-rates
>> since the system can get overwhelmed if every element also generates a
>> watermark. In that case I would suggest storing the lastest
>> element-timestamp in an internal field and only emitting in
>> getCurrentWatermark(), since then, then the watermark interval can be tunes
>> using the auto-watermark interval setting.
>> > >>>>>>>>
>> > >>>>>>>> But that should not be the cause of the problem that you
>> currently have. Would you maybe be willing to send me some (mock) example
>> data and the code so that I can reproduce the problem and have a look at
>> it? to aljoscha at apache.org.
>> > >>>>>>>>
>> > >>>>>>>> Cheers,
>> > >>>>>>>> Aljoscha
>> > >>>>>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf <
>> konstantin.kn...@tngtech.com> wrote:
>> > >>>>>>>>>
>> > >>>>>>>>> Hi Aljoscha,
>> > >>>>>>>>>
>> > >>>>>>>>> ok, now I at least understand, why it works with
>> fromElements(...). For
>> > >>>>>>>>> the rest I am not so sure.
>> > >>>>>>>>>
>> > >>>>>>>>>> What this means in your case is that the watermark can only
>> advance if
>> > >>>>>>>>> a new element arrives, because only then is the watermark
>> updated.
>> > >>>>>>>>>
>> > >>>>>>>>> But new elements arrive all the time, about 50/s, or do you
>> mean
>> > >>>>>>>>> something else?
>> > >>>>>>>>>
>> > >>>>>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to
>> be an ok
>> > >>>>>>>>> choice, if i understand the semantics correctly. It just
>> affects
>> > >>>>>>>>> watermarking in the absence of events, right?
>> > >>>>>>>>>
>> > >>>>>>>>> Cheers,
>> > >>>>>>>>>
>> > >>>>>>>>> Konstantin
>> > >>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>> > >>>>>>>>>> Hi,
>> > >>>>>>>>>> it could be what Gyula mentioned. Let me first go a bit into
>> how the TimestampExtractor works internally.
>> > >>>>>>>>>>
>> > >>>>>>>>>> First, the timestamp extractor internally keeps the value of
>> the last emitted watermark. Then, the semantics of the TimestampExtractor
>> are as follows :
>> > >>>>>>>>>> - the result of extractTimestamp is taken and it replaces
>> the internal timestamp of the element
>> > >>>>>>>>>> - if the result of extractWatermark is larger than the last
>> watermark the new value is emitted as a watermark and the value is stored
>> > >>>>>>>>>> - getCurrentWatermark is called on the specified
>> auto-watermark interval, if the returned value is larger than the last
>> watermark it is emitted and stored as last watermark
>> > >>>>>>>>>>
>> > >>>>>>>>>> What this means in your case is that the watermark can only
>> advance if a new element arrives, because only then is the watermark
>> updated.
>> > >>>>>>>>>>
>> > >>>>>>>>>> The reason why you see results if you use fromElements is
>> that the window-operator also emits all the windows that it currently has
>> buffered if the program closes. This happens in the case of fromElements
>> because only a finite number of elements is emitted, after which the source
>> closes, thereby finishing the whole program.
>> > >>>>>>>>>>
>> > >>>>>>>>>> Cheers,
>> > >>>>>>>>>> Aljoscha
>> > >>>>>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.f...@gmail.com>
>> wrote:
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Could this part of the extractor be the problem Aljoscha?
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> @Override
>> > >>>>>>>>>>> public long getCurrentWatermark() {
>> > >>>>>>>>>>>    return Long.MIN_VALUE;
>> > >>>>>>>>>>> }
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Gyula
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta
>> (időpont: 2015. nov. 16., H, 10:39):
>> > >>>>>>>>>>> Hi Aljoscha,
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> thanks for your answer. Yes I am using the same
>> TimestampExtractor-Class.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> The timestamps look good to me. Here an example.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> {"time": 1447666537260, ...} And parsed:
>> 2015-11-16T10:35:37.260+01:00
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> The order now is
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> stream
>> > >>>>>>>>>>> .map(dummyMapper)
>> > >>>>>>>>>>> .assignTimestamps(...)
>> > >>>>>>>>>>> .timeWindow(...)
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Is there a way to print out the assigned timestamps after
>> > >>>>>>>>>>> stream.assignTimestamps(...)?
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Cheers,
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Konstantin
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>> > >>>>>>>>>>>> Hi,
>> > >>>>>>>>>>>> are you also using the timestamp extractor when you are
>> using env.fromCollection().
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> Could you maybe insert a dummy mapper after the Kafka
>> source that just prints the element and forwards it? To see if the elements
>> come with a good timestamp from Kafka.
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> Cheers,
>> > >>>>>>>>>>>> Aljoscha
>> > >>>>>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf <
>> konstantin.kn...@tngtech.com> wrote:
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> Hi everyone,
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> I have the following issue with Flink (0.10) and Kafka.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> I am using a very simple TimestampExtractor like [1],
>> which just
>> > >>>>>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming
>> job, I read in
>> > >>>>>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082
>> like this:
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>> > >>>>>>>>>>>>> (parameterTool.getRequired("topic"),
>> > >>>>>>>>>>>>>           new AvroPojoDeserializationSchema(),
>> > >>>>>>>>>>>>> parameterTool.getProperties()))
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are
>> EventTime,
>> > >>>>>>>>>>>>> AutoWatermarkIntervall is 500.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> The problem is, when I do something like:
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>> > >>>>>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>> > >>>>>>>>>>>>> .sum(..)
>> > >>>>>>>>>>>>> .print()
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> env.execute();
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> the windows never get triggered.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> If I use ProcessingTime it works.
>> > >>>>>>>>>>>>> If I use env.fromCollection(...) instead of the
>> KafkaSource it works
>> > >>>>>>>>>>>>> with EventTime, too.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> Any ideas what I could be doing wrong are highly
>> appreciated.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> Cheers,
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> Konstantin
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> [1]:
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> public class PojoTimestampExtractor implements
>> TimestampExtractor<Pojo> {
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> final private long maxDelay;
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> public  PojoTimestampExtractor(long maxDelay) {
>> > >>>>>>>>>>>>>   this.maxDelay = maxDelay;
>> > >>>>>>>>>>>>> }
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> @Override
>> > >>>>>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) {
>> > >>>>>>>>>>>>>   return pojo.getTime();
>> > >>>>>>>>>>>>> }
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> @Override
>> > >>>>>>>>>>>>> public long extractWatermark(Pojo pojo, long l) {
>> > >>>>>>>>>>>>>   return pojo.getTime() - maxDelay;
>> > >>>>>>>>>>>>> }
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> @Override
>> > >>>>>>>>>>>>> public long getCurrentWatermark() {
>> > >>>>>>>>>>>>>   return Long.MIN_VALUE;
>> > >>>>>>>>>>>>> }
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> --
>> > >>>>>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com *
>> +49-174-3413182
>> > >>>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774
>> Unterföhring
>> > >>>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr.
>> Robert Dahlke
>> > >>>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>> > >>>>>>>>>>
>> > >>>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>> --
>> > >>>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com *
>> +49-174-3413182
>> > >>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774
>> Unterföhring
>> > >>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert
>> Dahlke
>> > >>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>> > >>>>>>>>
>> > >>>>>>>>
>> > >>>>>>>
>> > >>>>>>> --
>> > >>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com *
>> +49-174-3413182
>> > >>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> > >>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert
>> Dahlke
>> > >>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>> > >>>>>>
>> > >>>>>>
>> > >>>>>
>> > >>>>> --
>> > >>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>> > >>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> > >>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert
>> Dahlke
>> > >>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>> > >>>>
>> > >>>>
>> > >>>
>> > >>
>> > >> --
>> > >> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>> > >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> > >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> > >> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>> > >
>> >
>>
>>
>

Reply via email to