Hi Aljoscha,

sure, will do. I have neither found a solution. I won't have time to put
a minimal example together before the weekend though.

Cheers,

Konstantin

On 25.11.2015 19:10, Aljoscha Krettek wrote:
> Hi Konstantin,
> I still didn’t come up with an explanation for the behavior. Could you maybe 
> send me example code (and example data if it is necessary to reproduce the 
> problem.)? This would really help me pinpoint the problem.
> 
> Cheers,
> Aljoscha
>> On 17 Nov 2015, at 21:42, Konstantin Knauf <konstantin.kn...@tngtech.com> 
>> wrote:
>>
>> Hi Aljoscha,
>>
>> Are you sure? I am running the job from my IDE at the moment.
>>
>> If I set
>>
>> StreamExecutionEnvironment.setParallelism(1);
>>
>> I works with the old TimestampExtractor (returning Long.MIN_VALUE from
>> getCurrentWatermark() and emitting a watermark at every record)
>>
>> If I set
>>
>> StreamExecutionEnvironment.setParallelism(5);
>>
>> it does not work.
>>
>> So, if I understood you correctly, it is the opposite of what you were
>> expecting?!
>>
>> Cheers,
>>
>> Konstantin
>>
>>
>> On 17.11.2015 11:32, Aljoscha Krettek wrote:
>>> Hi,
>>> actually, the bug is more subtle. Normally, it is not a problem that the 
>>> TimestampExtractor sometimes emits a watermark that is lower than the one 
>>> before. (This is the result of the bug with Long.MIN_VALUE I mentioned 
>>> before). The stream operators wait for watermarks from all upstream 
>>> operators and only advance the watermark monotonically in lockstep with 
>>> them. This way, the watermark cannot decrease at an operator.
>>>
>>> In your case, you have a topology with parallelism 1, I assume. In that 
>>> case the operators are chained. (There is no separate operators but 
>>> basically only one operator and element transmission happens in function 
>>> calls). In this setting the watermarks are directly forwarded to operators 
>>> without going through the logic I mentioned above.
>>>
>>> Cheers,
>>> Aljoscha
>>>> On 16 Nov 2015, at 18:13, Konstantin Knauf <konstantin.kn...@tngtech.com> 
>>>> wrote:
>>>>
>>>> Hi Aljoscha,
>>>>
>>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
>>>> emit with getCurrentWatermark [1] as you suggested. So basically I do
>>>> the opposite than before (only watermarks per events vs only watermarks
>>>> per autowatermark). And now it works :). The question remains, why it
>>>> did not work before. As far as I see, it is an issue with the first
>>>> TimestmapExtractor itself?!
>>>>
>>>> Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?
>>>>
>>>> Cheers,
>>>>
>>>> Konstantin
>>>>
>>>> [1]
>>>>
>>>>   final private long maxDelay;
>>>>   private long lastTimestamp = Long.MIN_VALUE;
>>>>
>>>>   public PojoTimestampExtractor(long maxDelay) {
>>>>       this.maxDelay = maxDelay;
>>>>   }
>>>>
>>>>   @Override
>>>>   public long extractTimestamp(Pojo pojo, long l) {
>>>>       lastTimestamp = pojo.getTime();
>>>>       return pojo.getTime();
>>>>   }
>>>>
>>>>   @Override
>>>>   public long extractWatermark(Pojo pojo, long l) {
>>>>       return Long.MIN_VALUE;
>>>>   }
>>>>
>>>>   @Override
>>>>   public long getCurrentWatermark() {
>>>>       return lastTimestamp - maxDelay;
>>>>   }
>>>>
>>>>
>>>> On 16.11.2015 13:37, Aljoscha Krettek wrote:
>>>>> Hi,
>>>>> yes, at your data-rate emitting a watermark for every element should not 
>>>>> be a problem. It could become a problem with higher data-rates since the 
>>>>> system can get overwhelmed if every element also generates a watermark. 
>>>>> In that case I would suggest storing the lastest element-timestamp in an 
>>>>> internal field and only emitting in getCurrentWatermark(), since then, 
>>>>> then the watermark interval can be tunes using the auto-watermark 
>>>>> interval setting.
>>>>>
>>>>> But that should not be the cause of the problem that you currently have. 
>>>>> Would you maybe be willing to send me some (mock) example data and the 
>>>>> code so that I can reproduce the problem and have a look at it? to 
>>>>> aljoscha at apache.org.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf 
>>>>>> <konstantin.kn...@tngtech.com> wrote:
>>>>>>
>>>>>> Hi Aljoscha,
>>>>>>
>>>>>> ok, now I at least understand, why it works with fromElements(...). For
>>>>>> the rest I am not so sure.
>>>>>>
>>>>>>> What this means in your case is that the watermark can only advance if
>>>>>> a new element arrives, because only then is the watermark updated.
>>>>>>
>>>>>> But new elements arrive all the time, about 50/s, or do you mean
>>>>>> something else?
>>>>>>
>>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
>>>>>> choice, if i understand the semantics correctly. It just affects
>>>>>> watermarking in the absence of events, right?
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Konstantin
>>>>>>
>>>>>>
>>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>>>>>>> Hi,
>>>>>>> it could be what Gyula mentioned. Let me first go a bit into how the 
>>>>>>> TimestampExtractor works internally.
>>>>>>>
>>>>>>> First, the timestamp extractor internally keeps the value of the last 
>>>>>>> emitted watermark. Then, the semantics of the TimestampExtractor are as 
>>>>>>> follows :
>>>>>>> - the result of extractTimestamp is taken and it replaces the internal 
>>>>>>> timestamp of the element
>>>>>>> - if the result of extractWatermark is larger than the last watermark 
>>>>>>> the new value is emitted as a watermark and the value is stored
>>>>>>> - getCurrentWatermark is called on the specified auto-watermark 
>>>>>>> interval, if the returned value is larger than the last watermark it is 
>>>>>>> emitted and stored as last watermark
>>>>>>>
>>>>>>> What this means in your case is that the watermark can only advance if 
>>>>>>> a new element arrives, because only then is the watermark updated.
>>>>>>>
>>>>>>> The reason why you see results if you use fromElements is that the 
>>>>>>> window-operator also emits all the windows that it currently has 
>>>>>>> buffered if the program closes. This happens in the case of 
>>>>>>> fromElements because only a finite number of elements is emitted, after 
>>>>>>> which the source closes, thereby finishing the whole program.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.f...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> Could this part of the extractor be the problem Aljoscha?
>>>>>>>>
>>>>>>>> @Override
>>>>>>>>  public long getCurrentWatermark() {
>>>>>>>>      return Long.MIN_VALUE;
>>>>>>>>  }
>>>>>>>>
>>>>>>>> Gyula
>>>>>>>>
>>>>>>>> Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta (időpont: 
>>>>>>>> 2015. nov. 16., H, 10:39):
>>>>>>>> Hi Aljoscha,
>>>>>>>>
>>>>>>>> thanks for your answer. Yes I am using the same 
>>>>>>>> TimestampExtractor-Class.
>>>>>>>>
>>>>>>>> The timestamps look good to me. Here an example.
>>>>>>>>
>>>>>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>>>>>>>
>>>>>>>> The order now is
>>>>>>>>
>>>>>>>> stream
>>>>>>>> .map(dummyMapper)
>>>>>>>> .assignTimestamps(...)
>>>>>>>> .timeWindow(...)
>>>>>>>>
>>>>>>>> Is there a way to print out the assigned timestamps after
>>>>>>>> stream.assignTimestamps(...)?
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>>
>>>>>>>> Konstantin
>>>>>>>>
>>>>>>>>
>>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>>>>>>>> Hi,
>>>>>>>>> are you also using the timestamp extractor when you are using 
>>>>>>>>> env.fromCollection().
>>>>>>>>>
>>>>>>>>> Could you maybe insert a dummy mapper after the Kafka source that 
>>>>>>>>> just prints the element and forwards it? To see if the elements come 
>>>>>>>>> with a good timestamp from Kafka.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Aljoscha
>>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf 
>>>>>>>>>> <konstantin.kn...@tngtech.com> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi everyone,
>>>>>>>>>>
>>>>>>>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>>>>>>>
>>>>>>>>>> I am using a very simple TimestampExtractor like [1], which just
>>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read 
>>>>>>>>>> in
>>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>>>>>>>
>>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>>>>>>>> (parameterTool.getRequired("topic"),
>>>>>>>>>>             new AvroPojoDeserializationSchema(),
>>>>>>>>>> parameterTool.getProperties()))
>>>>>>>>>>
>>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>>>>>>>>>> AutoWatermarkIntervall is 500.
>>>>>>>>>>
>>>>>>>>>> The problem is, when I do something like:
>>>>>>>>>>
>>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>>>>>>>> .sum(..)
>>>>>>>>>> .print()
>>>>>>>>>>
>>>>>>>>>> env.execute();
>>>>>>>>>>
>>>>>>>>>> the windows never get triggered.
>>>>>>>>>>
>>>>>>>>>> If I use ProcessingTime it works.
>>>>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
>>>>>>>>>> with EventTime, too.
>>>>>>>>>>
>>>>>>>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>>
>>>>>>>>>> Konstantin
>>>>>>>>>>
>>>>>>>>>> [1]:
>>>>>>>>>>
>>>>>>>>>> public class PojoTimestampExtractor implements 
>>>>>>>>>> TimestampExtractor<Pojo> {
>>>>>>>>>>
>>>>>>>>>> final private long maxDelay;
>>>>>>>>>>
>>>>>>>>>> public  PojoTimestampExtractor(long maxDelay) {
>>>>>>>>>>     this.maxDelay = maxDelay;
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> @Override
>>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) {
>>>>>>>>>>     return pojo.getTime();
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> @Override
>>>>>>>>>> public long extractWatermark(Pojo pojo, long l) {
>>>>>>>>>>     return pojo.getTime() - maxDelay;
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> @Override
>>>>>>>>>> public long getCurrentWatermark() {
>>>>>>>>>>     return Long.MIN_VALUE;
>>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> -- 
>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>
>>>>>
>>>>
>>>> -- 
>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>
>>>
>>
>> -- 
>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply via email to