If my understanding is correct, then why `assignTimestampsAndWatermarks` before `keyBy` works? The `timeWindowAll` stream's input streams are task 1 and task 2, with task 2 idling, no matter whether `assignTimestampsAndWatermarks` is before or after `keyBy`, because whether task 2 receives elements only depends on the key distribution, has nothing to do with timestamp assignment, right?
/key 1 trips\ / \ (A) trips--> assignTimestampsAndWatermarks-->keyBy timeWindowAll \ idle / \key 2 trips/ /key 1 trips--> assignTimestampsAndWatermarks\ / \ (B) trips-->keyBy timeWindowAll \ idle / \key 2 trips--> assignTimestampsAndWatermarks/ How things are different between A and B from `timeWindowAll`'s perspective? BTW, thanks for the webinar link, I'll check it later. On 2019/04/25 08:30:20, Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi, > > Yes I think your explanation is correct. I can also recommend Seth's > webinar where he talks about debugging Watermarks[1] > > Best, > > Dawid > > [1] > https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial > > On 22/04/2019 22:55, an0 wrote: > > Thanks, I feel I'm getting closer to the truth. > > > > So parallelism is the cause? Say my parallelism is 2. Does that mean I get > > 2 tasks running after `keyBy` if even all elements have the same key so go > > to 1 down stream(say task 1)? And it is the other task(task 2) with no > > incoming data that caused the `timeWindowAll` stream unable to progress? > > Because both task 1 and task 2 are its input streams and one is idling so > > its event time cannot make progress? > > > > On 2019/04/22 01:57:39, Guowei Ma <guowei....@gmail.com> wrote: > >> HI, > >> > >> BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it > >> receives an element. > >> > >> For after Keyby: > >> Flink uses the HashCode of key and the parallelism of down stream to decide > >> which subtask would receive the element. This means if your key is always > >> same, all the sources will only send the elements to the same down stream > >> task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor. > >> > >> For before Keyby: > >> In your case, the Source and BoundedOutOfOrdernessTimestampExtractors would > >> be chained together, which means every > >> BoundedOutOfOrdernessTimestampExtractors will receive elements. > >> > >> Best, > >> Guowei > >> > >> > >> an0 <an0...@gmail.com> 于2019年4月19日周五 下午10:41写道: > >> > >>> Hi, > >>> > >>> First of all, thank you for the `shuffle()` tip. It works. However, I > >>> still don't understand why it doesn't work without calling `shuffle()`. > >>> > >>> Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips? > >>> All the trips has keys and timestamps. As I said in my reply to Paul, I > >>> see > >>> the same watermarks being extracted. > >>> > >>> How could calling `assignTimestampsAndWatermarks` before VS after `keyBy` > >>> matter? My understanding is any specific window for a specific key always > >>> receives the exactly same data, and the calling order of > >>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that. > >>> > >>> To make `keyBy` as irrelevant as possible, I tried letting it always > >>> return the same key so that there is only 1 keyed stream and it is exactly > >>> the same as the original unkeyed stream. It still doesn't trigger windows: > >>> ```java > >>> DataStream<Trip> trips = env.addSource(consumer); > >>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L); > >>> DataStream<Trip> featurizedUserTrips = > >>> userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new > >>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > >>> @Override > >>> public long extractTimestamp(Trip trip) { > >>> return trip.endTime.getTime(); > >>> } > >>> }); > >>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > >>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>> Time.days(1)); > >>> ``` > >>> > >>> It makes no sense to me. Please help me understand why it doesn't work. > >>> Thanks! > >>> > >>> On 2019/04/19 04:14:31, Guowei Ma <guowei....@gmail.com> wrote: > >>>> Hi, > >>>> After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors > >>>> could receive the elements(trip). If that is the case > >>>> BoundedOutOfOrdernessTimestampExtractor, which does not receive element > >>>> would not send the WM. Since that the timeWindowAll operator could not be > >>>> triggered. > >>>> You could add a shuffle() before the assignTimestampsAndWatermarks in > >>> your > >>>> second case and check if the window is triggered. If it could be > >>> triggered > >>>> you could check the distribution of elements generated by the source. > >>>> > >>>> Best, > >>>> Guowei > >>>> > >>>> > >>>> an0...@gmail.com <an0...@gmail.com> 于2019年4月19日周五 上午4:10写道: > >>>> > >>>>> I don't think it is the watermark. I see the same watermarks from the > >>> two > >>>>> versions of code. > >>>>> > >>>>> The processing on the keyed stream doesn't change event time at all. I > >>> can > >>>>> simply change my code to use `map` on the keyed stream to return back > >>> the > >>>>> input data, so that the window operator receives the exactly same > >>> data. The > >>>>> only difference is when I do `assignTimestampsAndWatermarks`. The > >>> result is > >>>>> the same, `assignTimestampsAndWatermarks` before `keyBy` works: > >>>>> ```java > >>>>> DataStream<Trip> trips = > >>>>> env.addSource(consumer).assignTimestampsAndWatermarks(new > >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > >>>>> @Override > >>>>> public long extractTimestamp(Trip trip) { > >>>>> return trip.endTime.getTime(); > >>>>> } > >>>>> }); > >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > >>>>> DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip); > >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > >>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>> Time.days(1)); > >>>>> ``` > >>>>> > >>>>> `assignTimestampsAndWatermarks` after `keyBy` doesn't work: > >>>>> ```java > >>>>> DataStream<Trip> trips = env.addSource(consumer); > >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > >>>>> DataStream<Trip> featurizedUserTrips = > >>>>> userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new > >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > >>>>> @Override > >>>>> public long extractTimestamp(Trip trip) { > >>>>> return trip.endTime.getTime(); > >>>>> } > >>>>> }); > >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > >>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>> Time.days(1)); > >>>>> ``` > >>>>> > >>>>> It feels a bug to me, but I want to confirm it before I file the bug > >>>>> report. > >>>>> > >>>>> On 2019/04/18 03:38:34, Paul Lam <paullin3...@gmail.com> wrote: > >>>>>> Hi, > >>>>>> > >>>>>> Could you check the watermark of the window operator? One possible > >>>>> situation would be some of the keys are not getting enough inputs, so > >>> their > >>>>> watermarks remain below the window end time and hold the window > >>> operator > >>>>> watermark back. IMO, it’s a good practice to assign watermark earlier > >>> in > >>>>> the data pipeline. > >>>>>> Best, > >>>>>> Paul Lam > >>>>>> > >>>>>>> 在 2019年4月17日,23:04,an0...@gmail.com 写道: > >>>>>>> > >>>>>>> `assignTimestampsAndWatermarks` before `keyBy` works: > >>>>>>> ```java > >>>>>>> DataStream<Trip> trips = > >>>>>>> env.addSource(consumer).assignTimestampsAndWatermarks(new > >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > >>>>>>> @Override > >>>>>>> public long extractTimestamp(Trip trip) { > >>>>>>> return trip.endTime.getTime(); > >>>>>>> } > >>>>>>> }); > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > >>> trip.userId); > >>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > >>> userTrips.process(new > >>>>> Featurization()); > >>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>>>> Time.days(1)); > >>>>>>> ``` > >>>>>>> > >>>>>>> But not after `keyBy` and `process`: > >>>>>>> ```java > >>>>>>> DataStream<Trip> trips = env.addSource(consumer); > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > >>> trip.userId); > >>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > >>>>>>> userTrips.process(new > >>>>> Featurization()).assignTimestampsAndWatermarks(new > >>>>> BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { > >>>>>>> @Override > >>>>>>> public long extractTimestamp(FeaturizedTrip trip) { > >>>>>>> return trip.endTime.getTime(); > >>>>>>> } > >>>>>>> }); > >>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>>>> Time.days(1)); > >>>>>>> ``` > >>>>>>> Windows are never triggered. > >>>>>>> > >>>>>>> Is it a bug or expected behavior? If the latter, where is it > >>>>> documented? > >>>>>> > >