Hi Aljoscha,

I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
emit with getCurrentWatermark [1] as you suggested. So basically I do
the opposite than before (only watermarks per events vs only watermarks
per autowatermark). And now it works :). The question remains, why it
did not work before. As far as I see, it is an issue with the first
TimestmapExtractor itself?!

Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?

Cheers,

Konstantin

[1]

    final private long maxDelay;
    private long lastTimestamp = Long.MIN_VALUE;

    public PojoTimestampExtractor(long maxDelay) {
        this.maxDelay = maxDelay;
    }

    @Override
    public long extractTimestamp(Pojo pojo, long l) {
        lastTimestamp = pojo.getTime();
        return pojo.getTime();
    }

    @Override
    public long extractWatermark(Pojo pojo, long l) {
        return Long.MIN_VALUE;
    }

    @Override
    public long getCurrentWatermark() {
        return lastTimestamp - maxDelay;
    }


On 16.11.2015 13:37, Aljoscha Krettek wrote:
> Hi,
> yes, at your data-rate emitting a watermark for every element should not be a 
> problem. It could become a problem with higher data-rates since the system 
> can get overwhelmed if every element also generates a watermark. In that case 
> I would suggest storing the lastest element-timestamp in an internal field 
> and only emitting in getCurrentWatermark(), since then, then the watermark 
> interval can be tunes using the auto-watermark interval setting.
> 
> But that should not be the cause of the problem that you currently have. 
> Would you maybe be willing to send me some (mock) example data and the code 
> so that I can reproduce the problem and have a look at it? to aljoscha at 
> apache.org.
> 
> Cheers,
> Aljoscha
>> On 16 Nov 2015, at 13:05, Konstantin Knauf <konstantin.kn...@tngtech.com> 
>> wrote:
>>
>> Hi Aljoscha,
>>
>> ok, now I at least understand, why it works with fromElements(...). For
>> the rest I am not so sure.
>>
>>> What this means in your case is that the watermark can only advance if
>> a new element arrives, because only then is the watermark updated.
>>
>> But new elements arrive all the time, about 50/s, or do you mean
>> something else?
>>
>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
>> choice, if i understand the semantics correctly. It just affects
>> watermarking in the absence of events, right?
>>
>> Cheers,
>>
>> Konstantin
>>
>>
>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>>> Hi,
>>> it could be what Gyula mentioned. Let me first go a bit into how the 
>>> TimestampExtractor works internally.
>>>
>>> First, the timestamp extractor internally keeps the value of the last 
>>> emitted watermark. Then, the semantics of the TimestampExtractor are as 
>>> follows :
>>> - the result of extractTimestamp is taken and it replaces the internal 
>>> timestamp of the element
>>> - if the result of extractWatermark is larger than the last watermark the 
>>> new value is emitted as a watermark and the value is stored
>>> - getCurrentWatermark is called on the specified auto-watermark interval, 
>>> if the returned value is larger than the last watermark it is emitted and 
>>> stored as last watermark
>>>
>>> What this means in your case is that the watermark can only advance if a 
>>> new element arrives, because only then is the watermark updated.
>>>
>>> The reason why you see results if you use fromElements is that the 
>>> window-operator also emits all the windows that it currently has buffered 
>>> if the program closes. This happens in the case of fromElements because 
>>> only a finite number of elements is emitted, after which the source closes, 
>>> thereby finishing the whole program.
>>>
>>> Cheers,
>>> Aljoscha
>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.f...@gmail.com> wrote:
>>>>
>>>> Could this part of the extractor be the problem Aljoscha?
>>>>
>>>> @Override
>>>>    public long getCurrentWatermark() {
>>>>        return Long.MIN_VALUE;
>>>>    }
>>>>
>>>> Gyula
>>>>
>>>> Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta (időpont: 2015. 
>>>> nov. 16., H, 10:39):
>>>> Hi Aljoscha,
>>>>
>>>> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
>>>>
>>>> The timestamps look good to me. Here an example.
>>>>
>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>>>
>>>> The order now is
>>>>
>>>> stream
>>>> .map(dummyMapper)
>>>> .assignTimestamps(...)
>>>> .timeWindow(...)
>>>>
>>>> Is there a way to print out the assigned timestamps after
>>>> stream.assignTimestamps(...)?
>>>>
>>>> Cheers,
>>>>
>>>> Konstantin
>>>>
>>>>
>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>>>> Hi,
>>>>> are you also using the timestamp extractor when you are using 
>>>>> env.fromCollection().
>>>>>
>>>>> Could you maybe insert a dummy mapper after the Kafka source that just 
>>>>> prints the element and forwards it? To see if the elements come with a 
>>>>> good timestamp from Kafka.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf 
>>>>>> <konstantin.kn...@tngtech.com> wrote:
>>>>>>
>>>>>> Hi everyone,
>>>>>>
>>>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>>>
>>>>>> I am using a very simple TimestampExtractor like [1], which just
>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>>>
>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>>>> (parameterTool.getRequired("topic"),
>>>>>>               new AvroPojoDeserializationSchema(),
>>>>>> parameterTool.getProperties()))
>>>>>>
>>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>>>>>> AutoWatermarkIntervall is 500.
>>>>>>
>>>>>> The problem is, when I do something like:
>>>>>>
>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>>>> .sum(..)
>>>>>> .print()
>>>>>>
>>>>>> env.execute();
>>>>>>
>>>>>> the windows never get triggered.
>>>>>>
>>>>>> If I use ProcessingTime it works.
>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
>>>>>> with EventTime, too.
>>>>>>
>>>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Konstantin
>>>>>>
>>>>>> [1]:
>>>>>>
>>>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
>>>>>>
>>>>>>   final private long maxDelay;
>>>>>>
>>>>>>   public  PojoTimestampExtractor(long maxDelay) {
>>>>>>       this.maxDelay = maxDelay;
>>>>>>   }
>>>>>>
>>>>>>   @Override
>>>>>>   public long extractTimestamp(Pojo fightEvent, long l) {
>>>>>>       return pojo.getTime();
>>>>>>   }
>>>>>>
>>>>>>   @Override
>>>>>>   public long extractWatermark(Pojo pojo, long l) {
>>>>>>       return pojo.getTime() - maxDelay;
>>>>>>   }
>>>>>>
>>>>>>   @Override
>>>>>>   public long getCurrentWatermark() {
>>>>>>       return Long.MIN_VALUE;
>>>>>>   }
>>>>>
>>>>>
>>>>
>>>> --
>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>
>>>
>>
>> -- 
>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply via email to