Thanks very much. It definitely explains the problem I'm seeing. However, something I need to confirm: You say "Watermarks are broadcasted/forwarded anyway." Do you mean, in assingTimestampsAndWatermarks.keyBy.window, it doesn't matter what data flows through a specific key's stream, all key streams have the same watermarks? So time-wise, `window` behaves as if `keyBy` is not there at all?
On 2019/04/26 06:34:10, Dawid Wysakowicz <[email protected]> wrote: > Hi, > > Watermarks are meta events that travel independently of data events. > > 1) If you assingTimestampsAndWatermarks before keyBy, all parallel > instances of trips have some data(this is my assumption) so Watermarks > can be generated. Afterwards even if some of the keyed partitions have > no data, Watermarks are broadcasted/forwarded anyway. In other words if > at some point Watermarks were generated for all partitions of a single > stage, they will be forwarded beyond this point. > > 2) If you assingTimestampsAndWatermarks after keyBy, you try to assign > watermarks for an empty partition which produces no Watermarks at all > for this partition, therefore there is no progress beyond this point. > > I hope this clarifies it a bit. > > Best, > > Dawid > > On 25/04/2019 16:49, an0 wrote: > > If my understanding is correct, then why `assignTimestampsAndWatermarks` > > before `keyBy` works? The `timeWindowAll` stream's input streams are task 1 > > and task 2, with task 2 idling, no matter whether > > `assignTimestampsAndWatermarks` is before or after `keyBy`, because whether > > task 2 receives elements only depends on the key distribution, has nothing > > to do with timestamp assignment, right? > > > > > > /key 1 trips\ > > > > / \ > > (A) trips--> assignTimestampsAndWatermarks-->keyBy > > timeWindowAll > > > > \ idle / > > > > \key 2 trips/ > > > > /key 1 trips--> assignTimestampsAndWatermarks\ > > / > > \ > > (B) trips-->keyBy > > timeWindowAll > > \ idle > > / > > \key 2 trips--> assignTimestampsAndWatermarks/ > > > > How things are different between A and B from `timeWindowAll`'s perspective? > > > > BTW, thanks for the webinar link, I'll check it later. > > > > On 2019/04/25 08:30:20, Dawid Wysakowicz <[email protected]> wrote: > >> Hi, > >> > >> Yes I think your explanation is correct. I can also recommend Seth's > >> webinar where he talks about debugging Watermarks[1] > >> > >> Best, > >> > >> Dawid > >> > >> [1] > >> https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial > >> > >> On 22/04/2019 22:55, an0 wrote: > >>> Thanks, I feel I'm getting closer to the truth. > >>> > >>> So parallelism is the cause? Say my parallelism is 2. Does that mean I > >>> get 2 tasks running after `keyBy` if even all elements have the same key > >>> so go to 1 down stream(say task 1)? And it is the other task(task 2) with > >>> no incoming data that caused the `timeWindowAll` stream unable to > >>> progress? Because both task 1 and task 2 are its input streams and one is > >>> idling so its event time cannot make progress? > >>> > >>> On 2019/04/22 01:57:39, Guowei Ma <[email protected]> wrote: > >>>> HI, > >>>> > >>>> BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it > >>>> receives an element. > >>>> > >>>> For after Keyby: > >>>> Flink uses the HashCode of key and the parallelism of down stream to > >>>> decide > >>>> which subtask would receive the element. This means if your key is always > >>>> same, all the sources will only send the elements to the same down stream > >>>> task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor. > >>>> > >>>> For before Keyby: > >>>> In your case, the Source and BoundedOutOfOrdernessTimestampExtractors > >>>> would > >>>> be chained together, which means every > >>>> BoundedOutOfOrdernessTimestampExtractors will receive elements. > >>>> > >>>> Best, > >>>> Guowei > >>>> > >>>> > >>>> an0 <[email protected]> 于2019年4月19日周五 下午10:41写道: > >>>> > >>>>> Hi, > >>>>> > >>>>> First of all, thank you for the `shuffle()` tip. It works. However, I > >>>>> still don't understand why it doesn't work without calling `shuffle()`. > >>>>> > >>>>> Why would not all BoundedOutOfOrdernessTimestampExtractors receive > >>>>> trips? > >>>>> All the trips has keys and timestamps. As I said in my reply to Paul, I > >>>>> see > >>>>> the same watermarks being extracted. > >>>>> > >>>>> How could calling `assignTimestampsAndWatermarks` before VS after > >>>>> `keyBy` > >>>>> matter? My understanding is any specific window for a specific key > >>>>> always > >>>>> receives the exactly same data, and the calling order of > >>>>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that. > >>>>> > >>>>> To make `keyBy` as irrelevant as possible, I tried letting it always > >>>>> return the same key so that there is only 1 keyed stream and it is > >>>>> exactly > >>>>> the same as the original unkeyed stream. It still doesn't trigger > >>>>> windows: > >>>>> ```java > >>>>> DataStream<Trip> trips = env.addSource(consumer); > >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L); > >>>>> DataStream<Trip> featurizedUserTrips = > >>>>> userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new > >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > >>>>> @Override > >>>>> public long extractTimestamp(Trip trip) { > >>>>> return trip.endTime.getTime(); > >>>>> } > >>>>> }); > >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > >>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>> Time.days(1)); > >>>>> ``` > >>>>> > >>>>> It makes no sense to me. Please help me understand why it doesn't work. > >>>>> Thanks! > >>>>> > >>>>> On 2019/04/19 04:14:31, Guowei Ma <[email protected]> wrote: > >>>>>> Hi, > >>>>>> After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors > >>>>>> could receive the elements(trip). If that is the case > >>>>>> BoundedOutOfOrdernessTimestampExtractor, which does not receive element > >>>>>> would not send the WM. Since that the timeWindowAll operator could not > >>>>>> be > >>>>>> triggered. > >>>>>> You could add a shuffle() before the assignTimestampsAndWatermarks in > >>>>> your > >>>>>> second case and check if the window is triggered. If it could be > >>>>> triggered > >>>>>> you could check the distribution of elements generated by the source. > >>>>>> > >>>>>> Best, > >>>>>> Guowei > >>>>>> > >>>>>> > >>>>>> [email protected] <[email protected]> 于2019年4月19日周五 上午4:10写道: > >>>>>> > >>>>>>> I don't think it is the watermark. I see the same watermarks from the > >>>>> two > >>>>>>> versions of code. > >>>>>>> > >>>>>>> The processing on the keyed stream doesn't change event time at all. I > >>>>> can > >>>>>>> simply change my code to use `map` on the keyed stream to return back > >>>>> the > >>>>>>> input data, so that the window operator receives the exactly same > >>>>> data. The > >>>>>>> only difference is when I do `assignTimestampsAndWatermarks`. The > >>>>> result is > >>>>>>> the same, `assignTimestampsAndWatermarks` before `keyBy` works: > >>>>>>> ```java > >>>>>>> DataStream<Trip> trips = > >>>>>>> env.addSource(consumer).assignTimestampsAndWatermarks(new > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > >>>>>>> @Override > >>>>>>> public long extractTimestamp(Trip trip) { > >>>>>>> return trip.endTime.getTime(); > >>>>>>> } > >>>>>>> }); > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > >>>>>>> DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip); > >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>>>> Time.days(1)); > >>>>>>> ``` > >>>>>>> > >>>>>>> `assignTimestampsAndWatermarks` after `keyBy` doesn't work: > >>>>>>> ```java > >>>>>>> DataStream<Trip> trips = env.addSource(consumer); > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > >>>>>>> DataStream<Trip> featurizedUserTrips = > >>>>>>> userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > >>>>>>> @Override > >>>>>>> public long extractTimestamp(Trip trip) { > >>>>>>> return trip.endTime.getTime(); > >>>>>>> } > >>>>>>> }); > >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>>>> Time.days(1)); > >>>>>>> ``` > >>>>>>> > >>>>>>> It feels a bug to me, but I want to confirm it before I file the bug > >>>>>>> report. > >>>>>>> > >>>>>>> On 2019/04/18 03:38:34, Paul Lam <[email protected]> wrote: > >>>>>>>> Hi, > >>>>>>>> > >>>>>>>> Could you check the watermark of the window operator? One possible > >>>>>>> situation would be some of the keys are not getting enough inputs, so > >>>>> their > >>>>>>> watermarks remain below the window end time and hold the window > >>>>> operator > >>>>>>> watermark back. IMO, it’s a good practice to assign watermark earlier > >>>>> in > >>>>>>> the data pipeline. > >>>>>>>> Best, > >>>>>>>> Paul Lam > >>>>>>>> > >>>>>>>>> 在 2019年4月17日,23:04,[email protected] 写道: > >>>>>>>>> > >>>>>>>>> `assignTimestampsAndWatermarks` before `keyBy` works: > >>>>>>>>> ```java > >>>>>>>>> DataStream<Trip> trips = > >>>>>>>>> env.addSource(consumer).assignTimestampsAndWatermarks(new > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > >>>>>>>>> @Override > >>>>>>>>> public long extractTimestamp(Trip trip) { > >>>>>>>>> return trip.endTime.getTime(); > >>>>>>>>> } > >>>>>>>>> }); > >>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > >>>>> trip.userId); > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > >>>>> userTrips.process(new > >>>>>>> Featurization()); > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > >>>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>>>>>> Time.days(1)); > >>>>>>>>> ``` > >>>>>>>>> > >>>>>>>>> But not after `keyBy` and `process`: > >>>>>>>>> ```java > >>>>>>>>> DataStream<Trip> trips = env.addSource(consumer); > >>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > >>>>> trip.userId); > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > >>>>>>>>> userTrips.process(new > >>>>>>> Featurization()).assignTimestampsAndWatermarks(new > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) > >>>>>>> { > >>>>>>>>> @Override > >>>>>>>>> public long extractTimestamp(FeaturizedTrip trip) { > >>>>>>>>> return trip.endTime.getTime(); > >>>>>>>>> } > >>>>>>>>> }); > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > >>>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > >>>>>>>>> Time.days(1)); > >>>>>>>>> ``` > >>>>>>>>> Windows are never triggered. > >>>>>>>>> > >>>>>>>>> Is it a bug or expected behavior? If the latter, where is it > >>>>>>> documented? > >> > >
