Hmm, that’s very strange. I’ll continue looking into it.
> On 17 Nov 2015, at 21:42, Konstantin Knauf <konstantin.kn...@tngtech.com>
> wrote:
>
> Hi Aljoscha,
>
> Are you sure? I am running the job from my IDE at the moment.
>
> If I set
>
> StreamExecutionEnvironment.setParallelism(1);
>
> I works with the old TimestampExtractor (returning Long.MIN_VALUE from
> getCurrentWatermark() and emitting a watermark at every record)
>
> If I set
>
> StreamExecutionEnvironment.setParallelism(5);
>
> it does not work.
>
> So, if I understood you correctly, it is the opposite of what you were
> expecting?!
>
> Cheers,
>
> Konstantin
>
>
> On 17.11.2015 11:32, Aljoscha Krettek wrote:
>> Hi,
>> actually, the bug is more subtle. Normally, it is not a problem that the
>> TimestampExtractor sometimes emits a watermark that is lower than the one
>> before. (This is the result of the bug with Long.MIN_VALUE I mentioned
>> before). The stream operators wait for watermarks from all upstream
>> operators and only advance the watermark monotonically in lockstep with
>> them. This way, the watermark cannot decrease at an operator.
>>
>> In your case, you have a topology with parallelism 1, I assume. In that case
>> the operators are chained. (There is no separate operators but basically
>> only one operator and element transmission happens in function calls). In
>> this setting the watermarks are directly forwarded to operators without
>> going through the logic I mentioned above.
>>
>> Cheers,
>> Aljoscha
>>> On 16 Nov 2015, at 18:13, Konstantin Knauf <konstantin.kn...@tngtech.com>
>>> wrote:
>>>
>>> Hi Aljoscha,
>>>
>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
>>> emit with getCurrentWatermark [1] as you suggested. So basically I do
>>> the opposite than before (only watermarks per events vs only watermarks
>>> per autowatermark). And now it works :). The question remains, why it
>>> did not work before. As far as I see, it is an issue with the first
>>> TimestmapExtractor itself?!
>>>
>>> Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>> [1]
>>>
>>> final private long maxDelay;
>>> private long lastTimestamp = Long.MIN_VALUE;
>>>
>>> public PojoTimestampExtractor(long maxDelay) {
>>> this.maxDelay = maxDelay;
>>> }
>>>
>>> @Override
>>> public long extractTimestamp(Pojo pojo, long l) {
>>> lastTimestamp = pojo.getTime();
>>> return pojo.getTime();
>>> }
>>>
>>> @Override
>>> public long extractWatermark(Pojo pojo, long l) {
>>> return Long.MIN_VALUE;
>>> }
>>>
>>> @Override
>>> public long getCurrentWatermark() {
>>> return lastTimestamp - maxDelay;
>>> }
>>>
>>>
>>> On 16.11.2015 13:37, Aljoscha Krettek wrote:
>>>> Hi,
>>>> yes, at your data-rate emitting a watermark for every element should not
>>>> be a problem. It could become a problem with higher data-rates since the
>>>> system can get overwhelmed if every element also generates a watermark. In
>>>> that case I would suggest storing the lastest element-timestamp in an
>>>> internal field and only emitting in getCurrentWatermark(), since then,
>>>> then the watermark interval can be tunes using the auto-watermark interval
>>>> setting.
>>>>
>>>> But that should not be the cause of the problem that you currently have.
>>>> Would you maybe be willing to send me some (mock) example data and the
>>>> code so that I can reproduce the problem and have a look at it? to
>>>> aljoscha at apache.org.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf <konstantin.kn...@tngtech.com>
>>>>> wrote:
>>>>>
>>>>> Hi Aljoscha,
>>>>>
>>>>> ok, now I at least understand, why it works with fromElements(...). For
>>>>> the rest I am not so sure.
>>>>>
>>>>>> What this means in your case is that the watermark can only advance if
>>>>> a new element arrives, because only then is the watermark updated.
>>>>>
>>>>> But new elements arrive all the time, about 50/s, or do you mean
>>>>> something else?
>>>>>
>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
>>>>> choice, if i understand the semantics correctly. It just affects
>>>>> watermarking in the absence of events, right?
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Konstantin
>>>>>
>>>>>
>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>>>>>> Hi,
>>>>>> it could be what Gyula mentioned. Let me first go a bit into how the
>>>>>> TimestampExtractor works internally.
>>>>>>
>>>>>> First, the timestamp extractor internally keeps the value of the last
>>>>>> emitted watermark. Then, the semantics of the TimestampExtractor are as
>>>>>> follows :
>>>>>> - the result of extractTimestamp is taken and it replaces the internal
>>>>>> timestamp of the element
>>>>>> - if the result of extractWatermark is larger than the last watermark
>>>>>> the new value is emitted as a watermark and the value is stored
>>>>>> - getCurrentWatermark is called on the specified auto-watermark
>>>>>> interval, if the returned value is larger than the last watermark it is
>>>>>> emitted and stored as last watermark
>>>>>>
>>>>>> What this means in your case is that the watermark can only advance if a
>>>>>> new element arrives, because only then is the watermark updated.
>>>>>>
>>>>>> The reason why you see results if you use fromElements is that the
>>>>>> window-operator also emits all the windows that it currently has
>>>>>> buffered if the program closes. This happens in the case of fromElements
>>>>>> because only a finite number of elements is emitted, after which the
>>>>>> source closes, thereby finishing the whole program.
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.f...@gmail.com> wrote:
>>>>>>>
>>>>>>> Could this part of the extractor be the problem Aljoscha?
>>>>>>>
>>>>>>> @Override
>>>>>>> public long getCurrentWatermark() {
>>>>>>> return Long.MIN_VALUE;
>>>>>>> }
>>>>>>>
>>>>>>> Gyula
>>>>>>>
>>>>>>> Konstantin Knauf <konstantin.kn...@tngtech.com> ezt írta (időpont:
>>>>>>> 2015. nov. 16., H, 10:39):
>>>>>>> Hi Aljoscha,
>>>>>>>
>>>>>>> thanks for your answer. Yes I am using the same
>>>>>>> TimestampExtractor-Class.
>>>>>>>
>>>>>>> The timestamps look good to me. Here an example.
>>>>>>>
>>>>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>>>>>>
>>>>>>> The order now is
>>>>>>>
>>>>>>> stream
>>>>>>> .map(dummyMapper)
>>>>>>> .assignTimestamps(...)
>>>>>>> .timeWindow(...)
>>>>>>>
>>>>>>> Is there a way to print out the assigned timestamps after
>>>>>>> stream.assignTimestamps(...)?
>>>>>>>
>>>>>>> Cheers,
>>>>>>>
>>>>>>> Konstantin
>>>>>>>
>>>>>>>
>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>>>>>>> Hi,
>>>>>>>> are you also using the timestamp extractor when you are using
>>>>>>>> env.fromCollection().
>>>>>>>>
>>>>>>>> Could you maybe insert a dummy mapper after the Kafka source that just
>>>>>>>> prints the element and forwards it? To see if the elements come with a
>>>>>>>> good timestamp from Kafka.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf
>>>>>>>>> <konstantin.kn...@tngtech.com> wrote:
>>>>>>>>>
>>>>>>>>> Hi everyone,
>>>>>>>>>
>>>>>>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>>>>>>
>>>>>>>>> I am using a very simple TimestampExtractor like [1], which just
>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read
>>>>>>>>> in
>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>>>>>>
>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>>>>>>> (parameterTool.getRequired("topic"),
>>>>>>>>> new AvroPojoDeserializationSchema(),
>>>>>>>>> parameterTool.getProperties()))
>>>>>>>>>
>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>>>>>>>>> AutoWatermarkIntervall is 500.
>>>>>>>>>
>>>>>>>>> The problem is, when I do something like:
>>>>>>>>>
>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>>>>>>> .sum(..)
>>>>>>>>> .print()
>>>>>>>>>
>>>>>>>>> env.execute();
>>>>>>>>>
>>>>>>>>> the windows never get triggered.
>>>>>>>>>
>>>>>>>>> If I use ProcessingTime it works.
>>>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
>>>>>>>>> with EventTime, too.
>>>>>>>>>
>>>>>>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>>
>>>>>>>>> Konstantin
>>>>>>>>>
>>>>>>>>> [1]:
>>>>>>>>>
>>>>>>>>> public class PojoTimestampExtractor implements
>>>>>>>>> TimestampExtractor<Pojo> {
>>>>>>>>>
>>>>>>>>> final private long maxDelay;
>>>>>>>>>
>>>>>>>>> public PojoTimestampExtractor(long maxDelay) {
>>>>>>>>> this.maxDelay = maxDelay;
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> @Override
>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long l) {
>>>>>>>>> return pojo.getTime();
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> @Override
>>>>>>>>> public long extractWatermark(Pojo pojo, long l) {
>>>>>>>>> return pojo.getTime() - maxDelay;
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> @Override
>>>>>>>>> public long getCurrentWatermark() {
>>>>>>>>> return Long.MIN_VALUE;
>>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>
>>>>
>>>
>>> --
>>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
>>
>
> --
> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082