Hi Aljoscha,

Are you sure? I am running the job from my IDE at the moment.

If I set

StreamExecutionEnvironment.setParallelism(1);

I works with the old TimestampExtractor (returning Long.MIN_VALUE from
getCurrentWatermark() and emitting a watermark at every record)

If I set

StreamExecutionEnvironment.setParallelism(5);

it does not work.

So, if I understood you correctly, it is the opposite of what you were
expecting?!

Cheers,

Konstantin


On 17.11.2015 11:32, Aljoscha Krettek wrote:
> Hi,
> actually, the bug is more subtle. Normally, it is not a problem that the 
> TimestampExtractor sometimes emits a watermark that is lower than the one 
> before. (This is the result of the bug with Long.MIN_VALUE I mentioned 
> before). The stream operators wait for watermarks from all upstream operators 
> and only advance the watermark monotonically in lockstep with them. This way, 
> the watermark cannot decrease at an operator.
> 
> In your case, you have a topology with parallelism 1, I assume. In that case 
> the operators are chained. (There is no separate operators but basically only 
> one operator and element transmission happens in function calls). In this 
> setting the watermarks are directly forwarded to operators without going 
> through the logic I mentioned above.
> 
> Cheers,
> Aljoscha
>> On 16 Nov 2015, at 18:13, Konstantin Knauf <konstantin.kn...@tngtech.com> 
>> wrote:
>>
>> Hi Aljoscha,
>>
>> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
>> emit with getCurrentWatermark [1] as you suggested. So basically I do
>> the opposite than before (only watermarks per events vs only watermarks
>> per autowatermark). And now it works :). The question remains, why it
>> did not work before. As far as I see, it is an issue with the first
>> TimestmapExtractor itself?!
>>
>> Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?
>>
>> Cheers,
>>
>> Konstantin
>>
>> [1]
>>
>>    final private long maxDelay;
>>    private long lastTimestamp = Long.MIN_VALUE;
>>
>>    public PojoTimestampExtractor(long maxDelay) {
>>        this.maxDelay = maxDelay;
>>    }
>>
>>    @Override
>>    public long extractTimestamp(Pojo pojo, long l) {
>>        lastTimestamp = pojo.getTime();
>>        return pojo.getTime();
>>    }
>>
>>    @Override
>>    public long extractWatermark(Pojo pojo, long l) {
>>        return Long.MIN_VALUE;
>>    }
>>
>>    @Override
>>    public long getCurrentWatermark() {
>>        return lastTimestamp - maxDelay;
>>    }
>>
>>
>> On 16.11.2015 13:37, Aljoscha Krettek wrote:
>>> Hi,
>>> yes, at your data-rate emitting a watermark for every element should not be 
>>> a problem. It could become a problem with higher data-rates since the 
>>> system can get overwhelmed if every element also generates a watermark. In 
>>> that case I would suggest storing the lastest element-timestamp in an 
>>> internal field and only emitting in getCurrentWatermark(), since then, then 
>>> the watermark interval can be tunes using the auto-watermark interval 
>>> setting.
>>>
>>> But that should not be the cause of the problem that you currently have. 
>>> Would you maybe be willing to send me some (mock) example data and the code 
>>> so that I can reproduce the problem and have a look at it? to aljoscha at 
>>> apache.org.
>>>
>>> Cheers,
>>> Aljoscha
>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf <konstantin.kn...@tngtech.com> 
>>>> wrote:
>>>>
>>>> Hi Aljoscha,
>>>>
>>>> ok, now I at least understand, why it works with fromElements(...). For
>>>> the rest I am not so sure.
>>>>
>>>>> What this means in your case is that the watermark can only advance if
>>>> a new element arrives, because only then is the watermark updated.
>>>>
>>>> But new elements arrive all the time, about 50/s, or do you mean
>>>> something else?
>>>>
>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
>>>> choice, if i understand the semantics correctly. It just affects
>>>> watermarking in the absence of events, right?
>>>>
>>>> Cheers,
>>>>
>>>> Konstantin
>>>>
>>>>
>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>>>>> Hi,
>>>>> it could be what Gyula mentioned. Let me first go a bit into how the 
>>>>> TimestampExtractor works internally.
>>>>>
>>>>> First, the timestamp extractor internally keeps the value of the last 
>>>>> emitted watermark. Then, the semantics of the TimestampExtractor are as 
>>>>> follows :
>>>>> - the result of extractTimestamp is taken and it replaces the internal 
>>>>> timestamp of the element
>>>>> - if the result of extractWatermark is larger than the last watermark the 
>>>>> new value is emitted as a watermark and the value is stored
>>>>> - getCurrentWatermark is called on the specified auto-watermark interval, 
>>>>> if the returned value is larger than the last watermark it is emitted and 
>>>>> stored as last watermark
>>>>>
>>>>> What this means in your case is that the watermark can only advance if a 
>>>>> new element arrives, because only then is the watermark updated.
>>>>>
>>>>> The reason why you see results if you use fromElements is that the 
>>>>> window-operator also emits all the windows that it currently has buffered 
>>>>> if the program closes. This happens in the case of fromElements because 
>>>>> only a finite number of elements is emitted, after which the source 
>>>>> closes, thereby finishing the whole program.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.f...@gmail.com> wrote:
>>>>>>
>>>>>> Could this part of the extractor be the problem Aljoscha?
>>>>>>
>>>>>> @Override
>>>>>>   public long getCurrentWatermark() {
>>>>>>       return Long.MIN_VALUE;
>>>>>>   }
>>>>>>
>>>>>> Gyula
>>>>>>
>>>>>> Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta (időpont: 2015. 
>>>>>> nov. 16., H, 10:39):
>>>>>> Hi Aljoscha,
>>>>>>
>>>>>> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
>>>>>>
>>>>>> The timestamps look good to me. Here an example.
>>>>>>
>>>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>>>>>
>>>>>> The order now is
>>>>>>
>>>>>> stream
>>>>>> .map(dummyMapper)
>>>>>> .assignTimestamps(...)
>>>>>> .timeWindow(...)
>>>>>>
>>>>>> Is there a way to print out the assigned timestamps after
>>>>>> stream.assignTimestamps(...)?
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Konstantin
>>>>>>
>>>>>>
>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>>>>>> Hi,
>>>>>>> are you also using the timestamp extractor when you are using 
>>>>>>> env.fromCollection().
>>>>>>>
>>>>>>> Could you maybe insert a dummy mapper after the Kafka source that just 
>>>>>>> prints the element and forwards it? To see if the elements come with a 
>>>>>>> good timestamp from Kafka.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf 
>>>>>>>> <konstantin.kn...@tngtech.com> wrote:
>>>>>>>>
>>>>>>>> Hi everyone,
>>>>>>>>
>>>>>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>>>>>
>>>>>>>> I am using a very simple TimestampExtractor like [1], which just
>>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>>>>>
>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>>>>>> (parameterTool.getRequired("topic"),
>>>>>>>>              new AvroPojoDeserializationSchema(),
>>>>>>>> parameterTool.getProperties()))
>>>>>>>>
>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>>>>>>>> AutoWatermarkIntervall is 500.
>>>>>>>>
>>>>>>>> The problem is, when I do something like:
>>>>>>>>
>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>>>>>> .sum(..)
>>>>>>>> .print()
>>>>>>>>
>>>>>>>> env.execute();
>>>>>>>>
>>>>>>>> the windows never get triggered.
>>>>>>>>
>>>>>>>> If I use ProcessingTime it works.
>>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
>>>>>>>> with EventTime, too.
>>>>>>>>
>>>>>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>>
>>>>>>>> Konstantin
>>>>>>>>
>>>>>>>> [1]:
>>>>>>>>
>>>>>>>> public class PojoTimestampExtractor implements 
>>>>>>>> TimestampExtractor<Pojo> {
>>>>>>>>
>>>>>>>>  final private long maxDelay;
>>>>>>>>
>>>>>>>>  public  PojoTimestampExtractor(long maxDelay) {
>>>>>>>>      this.maxDelay = maxDelay;
>>>>>>>>  }
>>>>>>>>
>>>>>>>>  @Override
>>>>>>>>  public long extractTimestamp(Pojo fightEvent, long l) {
>>>>>>>>      return pojo.getTime();
>>>>>>>>  }
>>>>>>>>
>>>>>>>>  @Override
>>>>>>>>  public long extractWatermark(Pojo pojo, long l) {
>>>>>>>>      return pojo.getTime() - maxDelay;
>>>>>>>>  }
>>>>>>>>
>>>>>>>>  @Override
>>>>>>>>  public long getCurrentWatermark() {
>>>>>>>>      return Long.MIN_VALUE;
>>>>>>>>  }
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>
>>>>>
>>>>
>>>> -- 
>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>
>>>
>>
>> -- 
>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply via email to