Hi,
yes, unfortunately, there is a bug in the timestamp extraction operator that 
sets the “last-seen watermark” to Long.MIN_VALUE even though it should not when 
calling getCurrentWatermark().

I’m opening an Issue and adding a fix to the latest master and the branch for 
the 0.10.x bugfix releases. Sorry for the inconvenience.

Cheers,
Aljoscha

> On 16 Nov 2015, at 18:13, Konstantin Knauf <konstantin.kn...@tngtech.com> 
> wrote:
> 
> Hi Aljoscha,
> 
> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
> emit with getCurrentWatermark [1] as you suggested. So basically I do
> the opposite than before (only watermarks per events vs only watermarks
> per autowatermark). And now it works :). The question remains, why it
> did not work before. As far as I see, it is an issue with the first
> TimestmapExtractor itself?!
> 
> Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?
> 
> Cheers,
> 
> Konstantin
> 
> [1]
> 
>    final private long maxDelay;
>    private long lastTimestamp = Long.MIN_VALUE;
> 
>    public PojoTimestampExtractor(long maxDelay) {
>        this.maxDelay = maxDelay;
>    }
> 
>    @Override
>    public long extractTimestamp(Pojo pojo, long l) {
>        lastTimestamp = pojo.getTime();
>        return pojo.getTime();
>    }
> 
>    @Override
>    public long extractWatermark(Pojo pojo, long l) {
>        return Long.MIN_VALUE;
>    }
> 
>    @Override
>    public long getCurrentWatermark() {
>        return lastTimestamp - maxDelay;
>    }
> 
> 
> On 16.11.2015 13:37, Aljoscha Krettek wrote:
>> Hi,
>> yes, at your data-rate emitting a watermark for every element should not be 
>> a problem. It could become a problem with higher data-rates since the system 
>> can get overwhelmed if every element also generates a watermark. In that 
>> case I would suggest storing the lastest element-timestamp in an internal 
>> field and only emitting in getCurrentWatermark(), since then, then the 
>> watermark interval can be tunes using the auto-watermark interval setting.
>> 
>> But that should not be the cause of the problem that you currently have. 
>> Would you maybe be willing to send me some (mock) example data and the code 
>> so that I can reproduce the problem and have a look at it? to aljoscha at 
>> apache.org.
>> 
>> Cheers,
>> Aljoscha
>>> On 16 Nov 2015, at 13:05, Konstantin Knauf <konstantin.kn...@tngtech.com> 
>>> wrote:
>>> 
>>> Hi Aljoscha,
>>> 
>>> ok, now I at least understand, why it works with fromElements(...). For
>>> the rest I am not so sure.
>>> 
>>>> What this means in your case is that the watermark can only advance if
>>> a new element arrives, because only then is the watermark updated.
>>> 
>>> But new elements arrive all the time, about 50/s, or do you mean
>>> something else?
>>> 
>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
>>> choice, if i understand the semantics correctly. It just affects
>>> watermarking in the absence of events, right?
>>> 
>>> Cheers,
>>> 
>>> Konstantin
>>> 
>>> 
>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>>>> Hi,
>>>> it could be what Gyula mentioned. Let me first go a bit into how the 
>>>> TimestampExtractor works internally.
>>>> 
>>>> First, the timestamp extractor internally keeps the value of the last 
>>>> emitted watermark. Then, the semantics of the TimestampExtractor are as 
>>>> follows :
>>>> - the result of extractTimestamp is taken and it replaces the internal 
>>>> timestamp of the element
>>>> - if the result of extractWatermark is larger than the last watermark the 
>>>> new value is emitted as a watermark and the value is stored
>>>> - getCurrentWatermark is called on the specified auto-watermark interval, 
>>>> if the returned value is larger than the last watermark it is emitted and 
>>>> stored as last watermark
>>>> 
>>>> What this means in your case is that the watermark can only advance if a 
>>>> new element arrives, because only then is the watermark updated.
>>>> 
>>>> The reason why you see results if you use fromElements is that the 
>>>> window-operator also emits all the windows that it currently has buffered 
>>>> if the program closes. This happens in the case of fromElements because 
>>>> only a finite number of elements is emitted, after which the source 
>>>> closes, thereby finishing the whole program.
>>>> 
>>>> Cheers,
>>>> Aljoscha
>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.f...@gmail.com> wrote:
>>>>> 
>>>>> Could this part of the extractor be the problem Aljoscha?
>>>>> 
>>>>> @Override
>>>>>   public long getCurrentWatermark() {
>>>>>       return Long.MIN_VALUE;
>>>>>   }
>>>>> 
>>>>> Gyula
>>>>> 
>>>>> Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta (időpont: 2015. 
>>>>> nov. 16., H, 10:39):
>>>>> Hi Aljoscha,
>>>>> 
>>>>> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
>>>>> 
>>>>> The timestamps look good to me. Here an example.
>>>>> 
>>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>>>> 
>>>>> The order now is
>>>>> 
>>>>> stream
>>>>> .map(dummyMapper)
>>>>> .assignTimestamps(...)
>>>>> .timeWindow(...)
>>>>> 
>>>>> Is there a way to print out the assigned timestamps after
>>>>> stream.assignTimestamps(...)?
>>>>> 
>>>>> Cheers,
>>>>> 
>>>>> Konstantin
>>>>> 
>>>>> 
>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>>>>> Hi,
>>>>>> are you also using the timestamp extractor when you are using 
>>>>>> env.fromCollection().
>>>>>> 
>>>>>> Could you maybe insert a dummy mapper after the Kafka source that just 
>>>>>> prints the element and forwards it? To see if the elements come with a 
>>>>>> good timestamp from Kafka.
>>>>>> 
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf 
>>>>>>> <konstantin.kn...@tngtech.com> wrote:
>>>>>>> 
>>>>>>> Hi everyone,
>>>>>>> 
>>>>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>>>> 
>>>>>>> I am using a very simple TimestampExtractor like [1], which just
>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>>>> 
>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>>>>> (parameterTool.getRequired("topic"),
>>>>>>>              new AvroPojoDeserializationSchema(),
>>>>>>> parameterTool.getProperties()))
>>>>>>> 
>>>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>>>>>>> AutoWatermarkIntervall is 500.
>>>>>>> 
>>>>>>> The problem is, when I do something like:
>>>>>>> 
>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>>>>> .sum(..)
>>>>>>> .print()
>>>>>>> 
>>>>>>> env.execute();
>>>>>>> 
>>>>>>> the windows never get triggered.
>>>>>>> 
>>>>>>> If I use ProcessingTime it works.
>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
>>>>>>> with EventTime, too.
>>>>>>> 
>>>>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> 
>>>>>>> Konstantin
>>>>>>> 
>>>>>>> [1]:
>>>>>>> 
>>>>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> 
>>>>>>> {
>>>>>>> 
>>>>>>>  final private long maxDelay;
>>>>>>> 
>>>>>>>  public  PojoTimestampExtractor(long maxDelay) {
>>>>>>>      this.maxDelay = maxDelay;
>>>>>>>  }
>>>>>>> 
>>>>>>>  @Override
>>>>>>>  public long extractTimestamp(Pojo fightEvent, long l) {
>>>>>>>      return pojo.getTime();
>>>>>>>  }
>>>>>>> 
>>>>>>>  @Override
>>>>>>>  public long extractWatermark(Pojo pojo, long l) {
>>>>>>>      return pojo.getTime() - maxDelay;
>>>>>>>  }
>>>>>>> 
>>>>>>>  @Override
>>>>>>>  public long getCurrentWatermark() {
>>>>>>>      return Long.MIN_VALUE;
>>>>>>>  }
>>>>>> 
>>>>>> 
>>>>> 
>>>>> --
>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>> 
>>>> 
>>> 
>>> -- 
>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>> 
>> 
> 
> -- 
> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply via email to