If my understanding is correct, then why `assignTimestampsAndWatermarks` before 
`keyBy` works? The `timeWindowAll` stream's input streams are task 1 and task 
2, with task 2 idling, no matter whether `assignTimestampsAndWatermarks` is 
before or after `keyBy`, because whether task 2 receives elements only depends 
on the key distribution, has nothing to do with timestamp assignment, right?

                                                                                
        /key 1 trips\
                                                                                
      /                    \  
(A) trips--> assignTimestampsAndWatermarks-->keyBy                    
timeWindowAll
                                                                                
      \       idle        /
                                                                                
        \key 2 trips/

                           /key 1 trips--> assignTimestampsAndWatermarks\
                         /                                                      
                           \  
(B) trips-->keyBy                                                               
                  timeWindowAll
                         \       idle                                           
                          /
                           \key 2 trips--> assignTimestampsAndWatermarks/

How things are different between A and B from `timeWindowAll`'s perspective?

BTW, thanks for the webinar link, I'll check it later.

On 2019/04/25 08:30:20, Dawid Wysakowicz <dwysakow...@apache.org> wrote: 
> Hi,
> 
> Yes I think your explanation is correct. I can also recommend Seth's
> webinar where he talks about debugging Watermarks[1]
> 
> Best,
> 
> Dawid
> 
> [1]
> https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial
> 
> On 22/04/2019 22:55, an0 wrote:
> > Thanks, I feel I'm getting closer to the truth. 
> >
> > So parallelism is the cause? Say my parallelism is 2. Does that mean I get 
> > 2 tasks running after `keyBy` if even all elements have the same key so go 
> > to 1 down stream(say task 1)? And it is the other task(task 2) with no 
> > incoming data that caused the `timeWindowAll` stream unable to progress? 
> > Because both task 1 and task 2 are its input streams and one is idling so 
> > its event time cannot make progress?
> >
> > On 2019/04/22 01:57:39, Guowei Ma <guowei....@gmail.com> wrote: 
> >> HI,
> >>
> >> BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it
> >> receives an element.
> >>
> >> For after Keyby:
> >> Flink uses the HashCode of key and the parallelism of down stream to decide
> >> which subtask would receive the element. This means if your key is always
> >> same, all the sources will only send the elements to the same down stream
> >> task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor.
> >>
> >> For before Keyby:
> >> In your case, the Source and BoundedOutOfOrdernessTimestampExtractors would
> >> be chained together, which means every
> >> BoundedOutOfOrdernessTimestampExtractors will receive elements.
> >>
> >> Best,
> >> Guowei
> >>
> >>
> >> an0 <an0...@gmail.com> 于2019年4月19日周五 下午10:41写道:
> >>
> >>> Hi,
> >>>
> >>> First of all, thank you for the `shuffle()` tip. It works. However, I
> >>> still don't understand why it doesn't work without calling `shuffle()`.
> >>>
> >>> Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips?
> >>> All the trips has keys and timestamps. As I said in my reply to Paul, I 
> >>> see
> >>> the same watermarks being extracted.
> >>>
> >>> How could calling `assignTimestampsAndWatermarks` before VS after `keyBy`
> >>> matter? My understanding is any specific window for a specific key always
> >>> receives the exactly same data, and the calling order of
> >>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that.
> >>>
> >>> To make `keyBy` as irrelevant as possible, I tried letting it always
> >>> return the same key so that there is only 1 keyed stream and it is exactly
> >>> the same as the original unkeyed stream. It still doesn't trigger windows:
> >>> ```java
> >>> DataStream<Trip> trips = env.addSource(consumer);
> >>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L);
> >>> DataStream<Trip> featurizedUserTrips =
> >>>         userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
> >>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> >>>     @Override
> >>>     public long extractTimestamp(Trip trip) {
> >>>         return trip.endTime.getTime();
> >>>     }
> >>> });
> >>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> >>> featurizedUserTrips.timeWindowAll(Time.days(7),
> >>>         Time.days(1));
> >>> ```
> >>>
> >>> It makes no sense to me. Please help me understand why it doesn't work.
> >>> Thanks!
> >>>
> >>> On 2019/04/19 04:14:31, Guowei Ma <guowei....@gmail.com> wrote:
> >>>> Hi,
> >>>> After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors
> >>>> could receive the elements(trip). If that is the case
> >>>> BoundedOutOfOrdernessTimestampExtractor, which does not receive element
> >>>> would not send the WM. Since that the timeWindowAll operator could not be
> >>>> triggered.
> >>>> You could add a shuffle() before the assignTimestampsAndWatermarks in
> >>> your
> >>>> second case and check if the window is triggered.  If it could be
> >>> triggered
> >>>> you could check the distribution of elements generated by the source.
> >>>>
> >>>> Best,
> >>>> Guowei
> >>>>
> >>>>
> >>>> an0...@gmail.com <an0...@gmail.com> 于2019年4月19日周五 上午4:10写道:
> >>>>
> >>>>> I don't think it is the watermark. I see the same watermarks from the
> >>> two
> >>>>> versions of code.
> >>>>>
> >>>>> The processing on the keyed stream doesn't change event time at all. I
> >>> can
> >>>>> simply change my code to use `map` on the keyed stream to return back
> >>> the
> >>>>> input data, so that the window operator receives the exactly same
> >>> data. The
> >>>>> only difference is when I do `assignTimestampsAndWatermarks`. The
> >>> result is
> >>>>> the same, `assignTimestampsAndWatermarks` before `keyBy` works:
> >>>>> ```java
> >>>>> DataStream<Trip> trips =
> >>>>>         env.addSource(consumer).assignTimestampsAndWatermarks(new
> >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> >>>>>     @Override
> >>>>>     public long extractTimestamp(Trip trip) {
> >>>>>         return trip.endTime.getTime();
> >>>>>     }
> >>>>> });
> >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> >>>>> DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip);
> >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> >>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> >>>>>         Time.days(1));
> >>>>> ```
> >>>>>
> >>>>> `assignTimestampsAndWatermarks` after `keyBy` doesn't work:
> >>>>> ```java
> >>>>> DataStream<Trip> trips = env.addSource(consumer);
> >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> >>>>> DataStream<Trip> featurizedUserTrips =
> >>>>>         userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
> >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> >>>>>     @Override
> >>>>>     public long extractTimestamp(Trip trip) {
> >>>>>         return trip.endTime.getTime();
> >>>>>     }
> >>>>> });
> >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> >>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> >>>>>         Time.days(1));
> >>>>> ```
> >>>>>
> >>>>> It feels a bug to me, but I want to confirm it before I file the bug
> >>>>> report.
> >>>>>
> >>>>> On 2019/04/18 03:38:34, Paul Lam <paullin3...@gmail.com> wrote:
> >>>>>> Hi,
> >>>>>>
> >>>>>> Could you check the watermark of the window operator? One possible
> >>>>> situation would be some of the keys are not getting enough inputs, so
> >>> their
> >>>>> watermarks remain below the window end time and hold the window
> >>> operator
> >>>>> watermark back. IMO, it’s a good practice to assign watermark earlier
> >>> in
> >>>>> the data pipeline.
> >>>>>> Best,
> >>>>>> Paul Lam
> >>>>>>
> >>>>>>> 在 2019年4月17日,23:04,an0...@gmail.com 写道:
> >>>>>>>
> >>>>>>> `assignTimestampsAndWatermarks` before `keyBy` works:
> >>>>>>> ```java
> >>>>>>> DataStream<Trip> trips =
> >>>>>>>        env.addSource(consumer).assignTimestampsAndWatermarks(new
> >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> >>>>>>>            @Override
> >>>>>>>            public long extractTimestamp(Trip trip) {
> >>>>>>>                return trip.endTime.getTime();
> >>>>>>>            }
> >>>>>>>        });
> >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip ->
> >>> trip.userId);
> >>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips =
> >>> userTrips.process(new
> >>>>> Featurization());
> >>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
> >>>>>>>        featurizedUserTrips.timeWindowAll(Time.days(7),
> >>>>>>>                Time.days(1));
> >>>>>>> ```
> >>>>>>>
> >>>>>>> But not after `keyBy` and `process`:
> >>>>>>> ```java
> >>>>>>> DataStream<Trip> trips = env.addSource(consumer);
> >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip ->
> >>> trip.userId);
> >>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips =
> >>>>>>>        userTrips.process(new
> >>>>> Featurization()).assignTimestampsAndWatermarks(new
> >>>>> BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) {
> >>>>>>>            @Override
> >>>>>>>            public long extractTimestamp(FeaturizedTrip trip) {
> >>>>>>>                return trip.endTime.getTime();
> >>>>>>>            }
> >>>>>>>        });
> >>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
> >>>>>>>        featurizedUserTrips.timeWindowAll(Time.days(7),
> >>>>>>>                Time.days(1));
> >>>>>>> ```
> >>>>>>> Windows are never triggered.
> >>>>>>>
> >>>>>>> Is it a bug or expected behavior? If the latter, where is it
> >>>>> documented?
> >>>>>>
> 
> 

Reply via email to