Thanks very much. It definitely explains the problem I'm seeing. However, 
something I need to confirm:
You say "Watermarks are broadcasted/forwarded anyway." Do you mean, in 
assingTimestampsAndWatermarks.keyBy.window, it doesn't matter what data flows 
through a specific key's stream, all key streams have the same watermarks? So 
time-wise, `window` behaves as if `keyBy` is not there at all?

On 2019/04/26 06:34:10, Dawid Wysakowicz <[email protected]> wrote: 
> Hi,
> 
> Watermarks are meta events that travel independently of data events.
> 
> 1) If you assingTimestampsAndWatermarks before keyBy, all parallel
> instances of trips have some data(this is my assumption) so Watermarks
> can be generated. Afterwards even if some of the keyed partitions have
> no data, Watermarks are broadcasted/forwarded anyway. In other words if
> at some point Watermarks were generated for all partitions of a single
> stage, they will be forwarded beyond this point.
> 
> 2) If you assingTimestampsAndWatermarks after keyBy, you try to assign
> watermarks for an empty partition which produces no Watermarks at all
> for this partition, therefore there is no progress beyond this point.
> 
> I hope this clarifies it a bit.
> 
> Best,
> 
> Dawid
> 
> On 25/04/2019 16:49, an0 wrote:
> > If my understanding is correct, then why `assignTimestampsAndWatermarks` 
> > before `keyBy` works? The `timeWindowAll` stream's input streams are task 1 
> > and task 2, with task 2 idling, no matter whether 
> > `assignTimestampsAndWatermarks` is before or after `keyBy`, because whether 
> > task 2 receives elements only depends on the key distribution, has nothing 
> > to do with timestamp assignment, right?
> >
> >                                                                             
> >             /key 1 trips\
> >                                                                             
> >           /                    \  
> > (A) trips--> assignTimestampsAndWatermarks-->keyBy                    
> > timeWindowAll
> >                                                                             
> >           \       idle        /
> >                                                                             
> >             \key 2 trips/
> >
> >                            /key 1 trips--> assignTimestampsAndWatermarks\
> >                          /                                                  
> >                                \  
> > (B) trips-->keyBy                                                           
> >                       timeWindowAll
> >                          \       idle                                       
> >                               /
> >                            \key 2 trips--> assignTimestampsAndWatermarks/
> >
> > How things are different between A and B from `timeWindowAll`'s perspective?
> >
> > BTW, thanks for the webinar link, I'll check it later.
> >
> > On 2019/04/25 08:30:20, Dawid Wysakowicz <[email protected]> wrote: 
> >> Hi,
> >>
> >> Yes I think your explanation is correct. I can also recommend Seth's
> >> webinar where he talks about debugging Watermarks[1]
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >> [1]
> >> https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial
> >>
> >> On 22/04/2019 22:55, an0 wrote:
> >>> Thanks, I feel I'm getting closer to the truth. 
> >>>
> >>> So parallelism is the cause? Say my parallelism is 2. Does that mean I 
> >>> get 2 tasks running after `keyBy` if even all elements have the same key 
> >>> so go to 1 down stream(say task 1)? And it is the other task(task 2) with 
> >>> no incoming data that caused the `timeWindowAll` stream unable to 
> >>> progress? Because both task 1 and task 2 are its input streams and one is 
> >>> idling so its event time cannot make progress?
> >>>
> >>> On 2019/04/22 01:57:39, Guowei Ma <[email protected]> wrote: 
> >>>> HI,
> >>>>
> >>>> BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it
> >>>> receives an element.
> >>>>
> >>>> For after Keyby:
> >>>> Flink uses the HashCode of key and the parallelism of down stream to 
> >>>> decide
> >>>> which subtask would receive the element. This means if your key is always
> >>>> same, all the sources will only send the elements to the same down stream
> >>>> task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor.
> >>>>
> >>>> For before Keyby:
> >>>> In your case, the Source and BoundedOutOfOrdernessTimestampExtractors 
> >>>> would
> >>>> be chained together, which means every
> >>>> BoundedOutOfOrdernessTimestampExtractors will receive elements.
> >>>>
> >>>> Best,
> >>>> Guowei
> >>>>
> >>>>
> >>>> an0 <[email protected]> 于2019年4月19日周五 下午10:41写道:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> First of all, thank you for the `shuffle()` tip. It works. However, I
> >>>>> still don't understand why it doesn't work without calling `shuffle()`.
> >>>>>
> >>>>> Why would not all BoundedOutOfOrdernessTimestampExtractors receive 
> >>>>> trips?
> >>>>> All the trips has keys and timestamps. As I said in my reply to Paul, I 
> >>>>> see
> >>>>> the same watermarks being extracted.
> >>>>>
> >>>>> How could calling `assignTimestampsAndWatermarks` before VS after 
> >>>>> `keyBy`
> >>>>> matter? My understanding is any specific window for a specific key 
> >>>>> always
> >>>>> receives the exactly same data, and the calling order of
> >>>>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that.
> >>>>>
> >>>>> To make `keyBy` as irrelevant as possible, I tried letting it always
> >>>>> return the same key so that there is only 1 keyed stream and it is 
> >>>>> exactly
> >>>>> the same as the original unkeyed stream. It still doesn't trigger 
> >>>>> windows:
> >>>>> ```java
> >>>>> DataStream<Trip> trips = env.addSource(consumer);
> >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L);
> >>>>> DataStream<Trip> featurizedUserTrips =
> >>>>>         userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
> >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> >>>>>     @Override
> >>>>>     public long extractTimestamp(Trip trip) {
> >>>>>         return trip.endTime.getTime();
> >>>>>     }
> >>>>> });
> >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> >>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> >>>>>         Time.days(1));
> >>>>> ```
> >>>>>
> >>>>> It makes no sense to me. Please help me understand why it doesn't work.
> >>>>> Thanks!
> >>>>>
> >>>>> On 2019/04/19 04:14:31, Guowei Ma <[email protected]> wrote:
> >>>>>> Hi,
> >>>>>> After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors
> >>>>>> could receive the elements(trip). If that is the case
> >>>>>> BoundedOutOfOrdernessTimestampExtractor, which does not receive element
> >>>>>> would not send the WM. Since that the timeWindowAll operator could not 
> >>>>>> be
> >>>>>> triggered.
> >>>>>> You could add a shuffle() before the assignTimestampsAndWatermarks in
> >>>>> your
> >>>>>> second case and check if the window is triggered.  If it could be
> >>>>> triggered
> >>>>>> you could check the distribution of elements generated by the source.
> >>>>>>
> >>>>>> Best,
> >>>>>> Guowei
> >>>>>>
> >>>>>>
> >>>>>> [email protected] <[email protected]> 于2019年4月19日周五 上午4:10写道:
> >>>>>>
> >>>>>>> I don't think it is the watermark. I see the same watermarks from the
> >>>>> two
> >>>>>>> versions of code.
> >>>>>>>
> >>>>>>> The processing on the keyed stream doesn't change event time at all. I
> >>>>> can
> >>>>>>> simply change my code to use `map` on the keyed stream to return back
> >>>>> the
> >>>>>>> input data, so that the window operator receives the exactly same
> >>>>> data. The
> >>>>>>> only difference is when I do `assignTimestampsAndWatermarks`. The
> >>>>> result is
> >>>>>>> the same, `assignTimestampsAndWatermarks` before `keyBy` works:
> >>>>>>> ```java
> >>>>>>> DataStream<Trip> trips =
> >>>>>>>         env.addSource(consumer).assignTimestampsAndWatermarks(new
> >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> >>>>>>>     @Override
> >>>>>>>     public long extractTimestamp(Trip trip) {
> >>>>>>>         return trip.endTime.getTime();
> >>>>>>>     }
> >>>>>>> });
> >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> >>>>>>> DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip);
> >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> >>>>>>>         Time.days(1));
> >>>>>>> ```
> >>>>>>>
> >>>>>>> `assignTimestampsAndWatermarks` after `keyBy` doesn't work:
> >>>>>>> ```java
> >>>>>>> DataStream<Trip> trips = env.addSource(consumer);
> >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> >>>>>>> DataStream<Trip> featurizedUserTrips =
> >>>>>>>         userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
> >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> >>>>>>>     @Override
> >>>>>>>     public long extractTimestamp(Trip trip) {
> >>>>>>>         return trip.endTime.getTime();
> >>>>>>>     }
> >>>>>>> });
> >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> >>>>>>>         Time.days(1));
> >>>>>>> ```
> >>>>>>>
> >>>>>>> It feels a bug to me, but I want to confirm it before I file the bug
> >>>>>>> report.
> >>>>>>>
> >>>>>>> On 2019/04/18 03:38:34, Paul Lam <[email protected]> wrote:
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> Could you check the watermark of the window operator? One possible
> >>>>>>> situation would be some of the keys are not getting enough inputs, so
> >>>>> their
> >>>>>>> watermarks remain below the window end time and hold the window
> >>>>> operator
> >>>>>>> watermark back. IMO, it’s a good practice to assign watermark earlier
> >>>>> in
> >>>>>>> the data pipeline.
> >>>>>>>> Best,
> >>>>>>>> Paul Lam
> >>>>>>>>
> >>>>>>>>> 在 2019年4月17日,23:04,[email protected] 写道:
> >>>>>>>>>
> >>>>>>>>> `assignTimestampsAndWatermarks` before `keyBy` works:
> >>>>>>>>> ```java
> >>>>>>>>> DataStream<Trip> trips =
> >>>>>>>>>        env.addSource(consumer).assignTimestampsAndWatermarks(new
> >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> >>>>>>>>>            @Override
> >>>>>>>>>            public long extractTimestamp(Trip trip) {
> >>>>>>>>>                return trip.endTime.getTime();
> >>>>>>>>>            }
> >>>>>>>>>        });
> >>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip ->
> >>>>> trip.userId);
> >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips =
> >>>>> userTrips.process(new
> >>>>>>> Featurization());
> >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
> >>>>>>>>>        featurizedUserTrips.timeWindowAll(Time.days(7),
> >>>>>>>>>                Time.days(1));
> >>>>>>>>> ```
> >>>>>>>>>
> >>>>>>>>> But not after `keyBy` and `process`:
> >>>>>>>>> ```java
> >>>>>>>>> DataStream<Trip> trips = env.addSource(consumer);
> >>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip ->
> >>>>> trip.userId);
> >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips =
> >>>>>>>>>        userTrips.process(new
> >>>>>>> Featurization()).assignTimestampsAndWatermarks(new
> >>>>>>> BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) 
> >>>>>>> {
> >>>>>>>>>            @Override
> >>>>>>>>>            public long extractTimestamp(FeaturizedTrip trip) {
> >>>>>>>>>                return trip.endTime.getTime();
> >>>>>>>>>            }
> >>>>>>>>>        });
> >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
> >>>>>>>>>        featurizedUserTrips.timeWindowAll(Time.days(7),
> >>>>>>>>>                Time.days(1));
> >>>>>>>>> ```
> >>>>>>>>> Windows are never triggered.
> >>>>>>>>>
> >>>>>>>>> Is it a bug or expected behavior? If the latter, where is it
> >>>>>>> documented?
> >>
> 
> 

Reply via email to