Hi,

Yes I think your explanation is correct. I can also recommend Seth's
webinar where he talks about debugging Watermarks[1]

Best,

Dawid

[1]
https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial

On 22/04/2019 22:55, an0 wrote:
> Thanks, I feel I'm getting closer to the truth. 
>
> So parallelism is the cause? Say my parallelism is 2. Does that mean I get 2 
> tasks running after `keyBy` if even all elements have the same key so go to 1 
> down stream(say task 1)? And it is the other task(task 2) with no incoming 
> data that caused the `timeWindowAll` stream unable to progress? Because both 
> task 1 and task 2 are its input streams and one is idling so its event time 
> cannot make progress?
>
> On 2019/04/22 01:57:39, Guowei Ma <guowei....@gmail.com> wrote: 
>> HI,
>>
>> BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it
>> receives an element.
>>
>> For after Keyby:
>> Flink uses the HashCode of key and the parallelism of down stream to decide
>> which subtask would receive the element. This means if your key is always
>> same, all the sources will only send the elements to the same down stream
>> task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor.
>>
>> For before Keyby:
>> In your case, the Source and BoundedOutOfOrdernessTimestampExtractors would
>> be chained together, which means every
>> BoundedOutOfOrdernessTimestampExtractors will receive elements.
>>
>> Best,
>> Guowei
>>
>>
>> an0 <an0...@gmail.com> 于2019年4月19日周五 下午10:41写道:
>>
>>> Hi,
>>>
>>> First of all, thank you for the `shuffle()` tip. It works. However, I
>>> still don't understand why it doesn't work without calling `shuffle()`.
>>>
>>> Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips?
>>> All the trips has keys and timestamps. As I said in my reply to Paul, I see
>>> the same watermarks being extracted.
>>>
>>> How could calling `assignTimestampsAndWatermarks` before VS after `keyBy`
>>> matter? My understanding is any specific window for a specific key always
>>> receives the exactly same data, and the calling order of
>>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that.
>>>
>>> To make `keyBy` as irrelevant as possible, I tried letting it always
>>> return the same key so that there is only 1 keyed stream and it is exactly
>>> the same as the original unkeyed stream. It still doesn't trigger windows:
>>> ```java
>>> DataStream<Trip> trips = env.addSource(consumer);
>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L);
>>> DataStream<Trip> featurizedUserTrips =
>>>         userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
>>>     @Override
>>>     public long extractTimestamp(Trip trip) {
>>>         return trip.endTime.getTime();
>>>     }
>>> });
>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
>>> featurizedUserTrips.timeWindowAll(Time.days(7),
>>>         Time.days(1));
>>> ```
>>>
>>> It makes no sense to me. Please help me understand why it doesn't work.
>>> Thanks!
>>>
>>> On 2019/04/19 04:14:31, Guowei Ma <guowei....@gmail.com> wrote:
>>>> Hi,
>>>> After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors
>>>> could receive the elements(trip). If that is the case
>>>> BoundedOutOfOrdernessTimestampExtractor, which does not receive element
>>>> would not send the WM. Since that the timeWindowAll operator could not be
>>>> triggered.
>>>> You could add a shuffle() before the assignTimestampsAndWatermarks in
>>> your
>>>> second case and check if the window is triggered.  If it could be
>>> triggered
>>>> you could check the distribution of elements generated by the source.
>>>>
>>>> Best,
>>>> Guowei
>>>>
>>>>
>>>> an0...@gmail.com <an0...@gmail.com> 于2019年4月19日周五 上午4:10写道:
>>>>
>>>>> I don't think it is the watermark. I see the same watermarks from the
>>> two
>>>>> versions of code.
>>>>>
>>>>> The processing on the keyed stream doesn't change event time at all. I
>>> can
>>>>> simply change my code to use `map` on the keyed stream to return back
>>> the
>>>>> input data, so that the window operator receives the exactly same
>>> data. The
>>>>> only difference is when I do `assignTimestampsAndWatermarks`. The
>>> result is
>>>>> the same, `assignTimestampsAndWatermarks` before `keyBy` works:
>>>>> ```java
>>>>> DataStream<Trip> trips =
>>>>>         env.addSource(consumer).assignTimestampsAndWatermarks(new
>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
>>>>>     @Override
>>>>>     public long extractTimestamp(Trip trip) {
>>>>>         return trip.endTime.getTime();
>>>>>     }
>>>>> });
>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
>>>>> DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip);
>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
>>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
>>>>>         Time.days(1));
>>>>> ```
>>>>>
>>>>> `assignTimestampsAndWatermarks` after `keyBy` doesn't work:
>>>>> ```java
>>>>> DataStream<Trip> trips = env.addSource(consumer);
>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
>>>>> DataStream<Trip> featurizedUserTrips =
>>>>>         userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
>>>>>     @Override
>>>>>     public long extractTimestamp(Trip trip) {
>>>>>         return trip.endTime.getTime();
>>>>>     }
>>>>> });
>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
>>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
>>>>>         Time.days(1));
>>>>> ```
>>>>>
>>>>> It feels a bug to me, but I want to confirm it before I file the bug
>>>>> report.
>>>>>
>>>>> On 2019/04/18 03:38:34, Paul Lam <paullin3...@gmail.com> wrote:
>>>>>> Hi,
>>>>>>
>>>>>> Could you check the watermark of the window operator? One possible
>>>>> situation would be some of the keys are not getting enough inputs, so
>>> their
>>>>> watermarks remain below the window end time and hold the window
>>> operator
>>>>> watermark back. IMO, it’s a good practice to assign watermark earlier
>>> in
>>>>> the data pipeline.
>>>>>> Best,
>>>>>> Paul Lam
>>>>>>
>>>>>>> 在 2019年4月17日,23:04,an0...@gmail.com 写道:
>>>>>>>
>>>>>>> `assignTimestampsAndWatermarks` before `keyBy` works:
>>>>>>> ```java
>>>>>>> DataStream<Trip> trips =
>>>>>>>        env.addSource(consumer).assignTimestampsAndWatermarks(new
>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
>>>>>>>            @Override
>>>>>>>            public long extractTimestamp(Trip trip) {
>>>>>>>                return trip.endTime.getTime();
>>>>>>>            }
>>>>>>>        });
>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip ->
>>> trip.userId);
>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips =
>>> userTrips.process(new
>>>>> Featurization());
>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
>>>>>>>        featurizedUserTrips.timeWindowAll(Time.days(7),
>>>>>>>                Time.days(1));
>>>>>>> ```
>>>>>>>
>>>>>>> But not after `keyBy` and `process`:
>>>>>>> ```java
>>>>>>> DataStream<Trip> trips = env.addSource(consumer);
>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip ->
>>> trip.userId);
>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips =
>>>>>>>        userTrips.process(new
>>>>> Featurization()).assignTimestampsAndWatermarks(new
>>>>> BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) {
>>>>>>>            @Override
>>>>>>>            public long extractTimestamp(FeaturizedTrip trip) {
>>>>>>>                return trip.endTime.getTime();
>>>>>>>            }
>>>>>>>        });
>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
>>>>>>>        featurizedUserTrips.timeWindowAll(Time.days(7),
>>>>>>>                Time.days(1));
>>>>>>> ```
>>>>>>> Windows are never triggered.
>>>>>>>
>>>>>>> Is it a bug or expected behavior? If the latter, where is it
>>>>> documented?
>>>>>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to