Hmm, that’s very strange. I’ll continue looking into it.
> On 17 Nov 2015, at 21:42, Konstantin Knauf <konstantin.kn...@tngtech.com> 
> wrote:
> 
> Hi Aljoscha,
> 
> Are you sure? I am running the job from my IDE at the moment.
> 
> If I set
> 
> StreamExecutionEnvironment.setParallelism(1);
> 
> I works with the old TimestampExtractor (returning Long.MIN_VALUE from
> getCurrentWatermark() and emitting a watermark at every record)
> 
> If I set
> 
> StreamExecutionEnvironment.setParallelism(5);
> 
> it does not work.
> 
> So, if I understood you correctly, it is the opposite of what you were
> expecting?!
> 
> Cheers,
> 
> Konstantin
> 
> 
> On 17.11.2015 11:32, Aljoscha Krettek wrote:
>> Hi,
>> actually, the bug is more subtle. Normally, it is not a problem that the 
>> TimestampExtractor sometimes emits a watermark that is lower than the one 
>> before. (This is the result of the bug with Long.MIN_VALUE I mentioned 
>> before). The stream operators wait for watermarks from all upstream 
>> operators and only advance the watermark monotonically in lockstep with 
>> them. This way, the watermark cannot decrease at an operator.
>> 
>> In your case, you have a topology with parallelism 1, I assume. In that case 
>> the operators are chained. (There is no separate operators but basically 
>> only one operator and element transmission happens in function calls). In 
>> this setting the watermarks are directly forwarded to operators without 
>> going through the logic I mentioned above.
>> 
>> Cheers,
>> Aljoscha
>>> On 16 Nov 2015, at 18:13, Konstantin Knauf <konstantin.kn...@tngtech.com> 
>>> wrote:
>>> 
>>> Hi Aljoscha,
>>> 
>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
>>> emit with getCurrentWatermark [1] as you suggested. So basically I do
>>> the opposite than before (only watermarks per events vs only watermarks
>>> per autowatermark). And now it works :). The question remains, why it
>>> did not work before. As far as I see, it is an issue with the first
>>> TimestmapExtractor itself?!
>>> 
>>> Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?
>>> 
>>> Cheers,
>>> 
>>> Konstantin
>>> 
>>> [1]
>>> 
>>>   final private long maxDelay;
>>>   private long lastTimestamp = Long.MIN_VALUE;
>>> 
>>>   public PojoTimestampExtractor(long maxDelay) {
>>>       this.maxDelay = maxDelay;
>>>   }
>>> 
>>>   @Override
>>>   public long extractTimestamp(Pojo pojo, long l) {
>>>       lastTimestamp = pojo.getTime();
>>>       return pojo.getTime();
>>>   }
>>> 
>>>   @Override
>>>   public long extractWatermark(Pojo pojo, long l) {
>>>       return Long.MIN_VALUE;
>>>   }
>>> 
>>>   @Override
>>>   public long getCurrentWatermark() {
>>>       return lastTimestamp - maxDelay;
>>>   }
>>> 
>>> 
>>> On 16.11.2015 13:37, Aljoscha Krettek wrote:
>>>> Hi,
>>>> yes, at your data-rate emitting a watermark for every element should not 
>>>> be a problem. It could become a problem with higher data-rates since the 
>>>> system can get overwhelmed if every element also generates a watermark. In 
>>>> that case I would suggest storing the lastest element-timestamp in an 
>>>> internal field and only emitting in getCurrentWatermark(), since then, 
>>>> then the watermark interval can be tunes using the auto-watermark interval 
>>>> setting.
>>>> 
>>>> But that should not be the cause of the problem that you currently have. 
>>>> Would you maybe be willing to send me some (mock) example data and the 
>>>> code so that I can reproduce the problem and have a look at it? to 
>>>> aljoscha at apache.org.
>>>> 
>>>> Cheers,
>>>> Aljoscha
>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf <konstantin.kn...@tngtech.com> 
>>>>> wrote:
>>>>> 
>>>>> Hi Aljoscha,
>>>>> 
>>>>> ok, now I at least understand, why it works with fromElements(...). For
>>>>> the rest I am not so sure.
>>>>> 
>>>>>> What this means in your case is that the watermark can only advance if
>>>>> a new element arrives, because only then is the watermark updated.
>>>>> 
>>>>> But new elements arrive all the time, about 50/s, or do you mean
>>>>> something else?
>>>>> 
>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
>>>>> choice, if i understand the semantics correctly. It just affects
>>>>> watermarking in the absence of events, right?
>>>>> 
>>>>> Cheers,
>>>>> 
>>>>> Konstantin
>>>>> 
>>>>> 
>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>>>>>> Hi,
>>>>>> it could be what Gyula mentioned. Let me first go a bit into how the 
>>>>>> TimestampExtractor works internally.
>>>>>> 
>>>>>> First, the timestamp extractor internally keeps the value of the last 
>>>>>> emitted watermark. Then, the semantics of the TimestampExtractor are as 
>>>>>> follows :
>>>>>> - the result of extractTimestamp is taken and it replaces the internal 
>>>>>> timestamp of the element
>>>>>> - if the result of extractWatermark is larger than the last watermark 
>>>>>> the new value is emitted as a watermark and the value is stored
>>>>>> - getCurrentWatermark is called on the specified auto-watermark 
>>>>>> interval, if the returned value is larger than the last watermark it is 
>>>>>> emitted and stored as last watermark
>>>>>> 
>>>>>> What this means in your case is that the watermark can only advance if a 
>>>>>> new element arrives, because only then is the watermark updated.
>>>>>> 
>>>>>> The reason why you see results if you use fromElements is that the 
>>>>>> window-operator also emits all the windows that it currently has 
>>>>>> buffered if the program closes. This happens in the case of fromElements 
>>>>>> because only a finite number of elements is emitted, after which the 
>>>>>> source closes, thereby finishing the whole program.
>>>>>> 
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.f...@gmail.com> wrote:
>>>>>>> 
>>>>>>> Could this part of the extractor be the problem Aljoscha?
>>>>>>> 
>>>>>>> @Override
>>>>>>>  public long getCurrentWatermark() {
>>>>>>>      return Long.MIN_VALUE;
>>>>>>>  }
>>>>>>> 
>>>>>>> Gyula
>>>>>>> 
>>>>>>> Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta (időpont: 
>>>>>>> 2015. nov. 16., H, 10:39):
>>>>>>> Hi Aljoscha,
>>>>>>> 
>>>>>>> thanks for your answer. Yes I am using the same 
>>>>>>> TimestampExtractor-Class.
>>>>>>> 
>>>>>>> The timestamps look good to me. Here an example.
>>>>>>> 
>>>>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>>>>>> 
>>>>>>> The order now is
>>>>>>> 
>>>>>>> stream
>>>>>>> .map(dummyMapper)
>>>>>>> .assignTimestamps(...)
>>>>>>> .timeWindow(...)
>>>>>>> 
>>>>>>> Is there a way to print out the assigned timestamps after
>>>>>>> stream.assignTimestamps(...)?
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> 
>>>>>>> Konstantin
>>>>>>> 
>>>>>>> 
>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>>>>>>> Hi,
>>>>>>>> are you also using the timestamp extractor when you are using 
>>>>>>>> env.fromCollection().
>>>>>>>> 
>>>>>>>> Could you maybe insert a dummy mapper after the Kafka source that just 
>>>>>>>> prints the element and forwards it? To see if the elements come with a 
>>>>>>>> good timestamp from Kafka.
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf 
>>>>>>>>> <konstantin.kn...@tngtech.com> wrote:
>>>>>>>>> 
>>>>>>>>> Hi everyone,
>>>>>>>>> 
>>>>>>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>>>>>> 
>>>>>>>>> I am using a very simple TimestampExtractor like [1], which just
>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read 
>>>>>>>>> in
>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>>>>>> 
>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>>>>>>> (parameterTool.getRequired("topic"),
>>>>>>>>>             new AvroPojoDeserializationSchema(),
>>>>>>>>> parameterTool.getProperties()))
>>>>>>>>> 
>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>>>>>>>>> AutoWatermarkIntervall is 500.
>>>>>>>>> 
>>>>>>>>> The problem is, when I do something like:
>>>>>>>>> 
>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>>>>>>> .sum(..)
>>>>>>>>> .print()
>>>>>>>>> 
>>>>>>>>> env.execute();
>>>>>>>>> 
>>>>>>>>> the windows never get triggered.
>>>>>>>>> 
>>>>>>>>> If I use ProcessingTime it works.
>>>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
>>>>>>>>> with EventTime, too.
>>>>>>>>> 
>>>>>>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> 
>>>>>>>>> Konstantin
>>>>>>>>> 
>>>>>>>>> [1]:
>>>>>>>>> 
>>>>>>>>> public class PojoTimestampExtractor implements 
>>>>>>>>> TimestampExtractor<Pojo> {
>>>>>>>>> 
>>>>>>>>> final private long maxDelay;
>>>>>>>>> 
>>>>>>>>> public  PojoTimestampExtractor(long maxDelay) {
>>>>>>>>>     this.maxDelay = maxDelay;
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> @Override
>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) {
>>>>>>>>>     return pojo.getTime();
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> @Override
>>>>>>>>> public long extractWatermark(Pojo pojo, long l) {
>>>>>>>>>     return pojo.getTime() - maxDelay;
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> @Override
>>>>>>>>> public long getCurrentWatermark() {
>>>>>>>>>     return Long.MIN_VALUE;
>>>>>>>>> }
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>> 
>>>>>> 
>>>>> 
>>>>> -- 
>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>> 
>>>> 
>>> 
>>> -- 
>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>> 
>> 
> 
> -- 
> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply via email to