This explanation is exactly what I'm looking for, thanks! Is such an important rule documented anywhere in the official document?
On 2019/04/30 08:47:29, Fabian Hueske <[email protected]> wrote: > An operator task broadcasts its current watermark to all downstream tasks > that might receive its records. > If you have an the following code: > > DataStream<X> a = ... > a.map(A).map(B).keyBy(....).window(C) > > and execute this with parallelism 2, your plan looks like this > > A.1 -- B.1 --\--/-- C.1 > X > A.2 -- B.2 --/--\-- C.2 > > A.1 will propagate its watermarks to B.1 because only B.1 will receive its > output events. > However, B.1 will propagate its watermarks to C.1 and C.2 because the > output of B.1 is partitioned and all C tasks might receive output events > from B.1. > > Best, Fabian > > Am Mo., 29. Apr. 2019 um 20:06 Uhr schrieb an0 <[email protected]>: > > > Thanks very much. It definitely explains the problem I'm seeing. However, > > something I need to confirm: > > You say "Watermarks are broadcasted/forwarded anyway." Do you mean, in > > assingTimestampsAndWatermarks.keyBy.window, it doesn't matter what data > > flows through a specific key's stream, all key streams have the same > > watermarks? So time-wise, `window` behaves as if `keyBy` is not there at > > all? > > > > On 2019/04/26 06:34:10, Dawid Wysakowicz <[email protected]> wrote: > > > Hi, > > > > > > Watermarks are meta events that travel independently of data events. > > > > > > 1) If you assingTimestampsAndWatermarks before keyBy, all parallel > > > instances of trips have some data(this is my assumption) so Watermarks > > > can be generated. Afterwards even if some of the keyed partitions have > > > no data, Watermarks are broadcasted/forwarded anyway. In other words if > > > at some point Watermarks were generated for all partitions of a single > > > stage, they will be forwarded beyond this point. > > > > > > 2) If you assingTimestampsAndWatermarks after keyBy, you try to assign > > > watermarks for an empty partition which produces no Watermarks at all > > > for this partition, therefore there is no progress beyond this point. > > > > > > I hope this clarifies it a bit. > > > > > > Best, > > > > > > Dawid > > > > > > On 25/04/2019 16:49, an0 wrote: > > > > If my understanding is correct, then why > > `assignTimestampsAndWatermarks` before `keyBy` works? The `timeWindowAll` > > stream's input streams are task 1 and task 2, with task 2 idling, no matter > > whether `assignTimestampsAndWatermarks` is before or after `keyBy`, because > > whether task 2 receives elements only depends on the key distribution, has > > nothing to do with timestamp assignment, right? > > > > > > > > > > /key 1 trips\ > > > > > > / \ > > > > (A) trips--> assignTimestampsAndWatermarks-->keyBy > > timeWindowAll > > > > > > \ idle / > > > > > > \key 2 trips/ > > > > > > > > /key 1 trips--> > > assignTimestampsAndWatermarks\ > > > > / > > \ > > > > (B) trips-->keyBy > > timeWindowAll > > > > \ idle > > / > > > > \key 2 trips--> > > assignTimestampsAndWatermarks/ > > > > > > > > How things are different between A and B from `timeWindowAll`'s > > perspective? > > > > > > > > BTW, thanks for the webinar link, I'll check it later. > > > > > > > > On 2019/04/25 08:30:20, Dawid Wysakowicz <[email protected]> > > wrote: > > > >> Hi, > > > >> > > > >> Yes I think your explanation is correct. I can also recommend Seth's > > > >> webinar where he talks about debugging Watermarks[1] > > > >> > > > >> Best, > > > >> > > > >> Dawid > > > >> > > > >> [1] > > > >> > > https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial > > > >> > > > >> On 22/04/2019 22:55, an0 wrote: > > > >>> Thanks, I feel I'm getting closer to the truth. > > > >>> > > > >>> So parallelism is the cause? Say my parallelism is 2. Does that mean > > I get 2 tasks running after `keyBy` if even all elements have the same key > > so go to 1 down stream(say task 1)? And it is the other task(task 2) with > > no incoming data that caused the `timeWindowAll` stream unable to progress? > > Because both task 1 and task 2 are its input streams and one is idling so > > its event time cannot make progress? > > > >>> > > > >>> On 2019/04/22 01:57:39, Guowei Ma <[email protected]> wrote: > > > >>>> HI, > > > >>>> > > > >>>> BoundedOutOfOrdernessTimestampExtractors can send a WM at least > > after it > > > >>>> receives an element. > > > >>>> > > > >>>> For after Keyby: > > > >>>> Flink uses the HashCode of key and the parallelism of down stream > > to decide > > > >>>> which subtask would receive the element. This means if your key is > > always > > > >>>> same, all the sources will only send the elements to the same down > > stream > > > >>>> task, for example only no. 3 > > BoundedOutOfOrdernessTimestampExtractor. > > > >>>> > > > >>>> For before Keyby: > > > >>>> In your case, the Source and > > BoundedOutOfOrdernessTimestampExtractors would > > > >>>> be chained together, which means every > > > >>>> BoundedOutOfOrdernessTimestampExtractors will receive elements. > > > >>>> > > > >>>> Best, > > > >>>> Guowei > > > >>>> > > > >>>> > > > >>>> an0 <[email protected]> 于2019年4月19日周五 下午10:41写道: > > > >>>> > > > >>>>> Hi, > > > >>>>> > > > >>>>> First of all, thank you for the `shuffle()` tip. It works. > > However, I > > > >>>>> still don't understand why it doesn't work without calling > > `shuffle()`. > > > >>>>> > > > >>>>> Why would not all BoundedOutOfOrdernessTimestampExtractors receive > > trips? > > > >>>>> All the trips has keys and timestamps. As I said in my reply to > > Paul, I see > > > >>>>> the same watermarks being extracted. > > > >>>>> > > > >>>>> How could calling `assignTimestampsAndWatermarks` before VS after > > `keyBy` > > > >>>>> matter? My understanding is any specific window for a specific key > > always > > > >>>>> receives the exactly same data, and the calling order of > > > >>>>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that. > > > >>>>> > > > >>>>> To make `keyBy` as irrelevant as possible, I tried letting it > > always > > > >>>>> return the same key so that there is only 1 keyed stream and it is > > exactly > > > >>>>> the same as the original unkeyed stream. It still doesn't trigger > > windows: > > > >>>>> ```java > > > >>>>> DataStream<Trip> trips = env.addSource(consumer); > > > >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L); > > > >>>>> DataStream<Trip> featurizedUserTrips = > > > >>>>> userTrips.map(trip -> > > trip).assignTimestampsAndWatermarks(new > > > >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > >>>>> @Override > > > >>>>> public long extractTimestamp(Trip trip) { > > > >>>>> return trip.endTime.getTime(); > > > >>>>> } > > > >>>>> }); > > > >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > >>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > >>>>> Time.days(1)); > > > >>>>> ``` > > > >>>>> > > > >>>>> It makes no sense to me. Please help me understand why it doesn't > > work. > > > >>>>> Thanks! > > > >>>>> > > > >>>>> On 2019/04/19 04:14:31, Guowei Ma <[email protected]> wrote: > > > >>>>>> Hi, > > > >>>>>> After keyby maybe only some of > > BoundedOutOfOrdernessTimestampExtractors > > > >>>>>> could receive the elements(trip). If that is the case > > > >>>>>> BoundedOutOfOrdernessTimestampExtractor, which does not receive > > element > > > >>>>>> would not send the WM. Since that the timeWindowAll operator > > could not be > > > >>>>>> triggered. > > > >>>>>> You could add a shuffle() before the > > assignTimestampsAndWatermarks in > > > >>>>> your > > > >>>>>> second case and check if the window is triggered. If it could be > > > >>>>> triggered > > > >>>>>> you could check the distribution of elements generated by the > > source. > > > >>>>>> > > > >>>>>> Best, > > > >>>>>> Guowei > > > >>>>>> > > > >>>>>> > > > >>>>>> [email protected] <[email protected]> 于2019年4月19日周五 上午4:10写道: > > > >>>>>> > > > >>>>>>> I don't think it is the watermark. I see the same watermarks > > from the > > > >>>>> two > > > >>>>>>> versions of code. > > > >>>>>>> > > > >>>>>>> The processing on the keyed stream doesn't change event time at > > all. I > > > >>>>> can > > > >>>>>>> simply change my code to use `map` on the keyed stream to return > > back > > > >>>>> the > > > >>>>>>> input data, so that the window operator receives the exactly same > > > >>>>> data. The > > > >>>>>>> only difference is when I do `assignTimestampsAndWatermarks`. The > > > >>>>> result is > > > >>>>>>> the same, `assignTimestampsAndWatermarks` before `keyBy` works: > > > >>>>>>> ```java > > > >>>>>>> DataStream<Trip> trips = > > > >>>>>>> env.addSource(consumer).assignTimestampsAndWatermarks(new > > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > >>>>>>> @Override > > > >>>>>>> public long extractTimestamp(Trip trip) { > > > >>>>>>> return trip.endTime.getTime(); > > > >>>>>>> } > > > >>>>>>> }); > > > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > trip.userId); > > > >>>>>>> DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> > > trip); > > > >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > >>>>>>> Time.days(1)); > > > >>>>>>> ``` > > > >>>>>>> > > > >>>>>>> `assignTimestampsAndWatermarks` after `keyBy` doesn't work: > > > >>>>>>> ```java > > > >>>>>>> DataStream<Trip> trips = env.addSource(consumer); > > > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > trip.userId); > > > >>>>>>> DataStream<Trip> featurizedUserTrips = > > > >>>>>>> userTrips.map(trip -> > > trip).assignTimestampsAndWatermarks(new > > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > >>>>>>> @Override > > > >>>>>>> public long extractTimestamp(Trip trip) { > > > >>>>>>> return trip.endTime.getTime(); > > > >>>>>>> } > > > >>>>>>> }); > > > >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > >>>>>>> Time.days(1)); > > > >>>>>>> ``` > > > >>>>>>> > > > >>>>>>> It feels a bug to me, but I want to confirm it before I file the > > bug > > > >>>>>>> report. > > > >>>>>>> > > > >>>>>>> On 2019/04/18 03:38:34, Paul Lam <[email protected]> wrote: > > > >>>>>>>> Hi, > > > >>>>>>>> > > > >>>>>>>> Could you check the watermark of the window operator? One > > possible > > > >>>>>>> situation would be some of the keys are not getting enough > > inputs, so > > > >>>>> their > > > >>>>>>> watermarks remain below the window end time and hold the window > > > >>>>> operator > > > >>>>>>> watermark back. IMO, it’s a good practice to assign watermark > > earlier > > > >>>>> in > > > >>>>>>> the data pipeline. > > > >>>>>>>> Best, > > > >>>>>>>> Paul Lam > > > >>>>>>>> > > > >>>>>>>>> 在 2019年4月17日,23:04,[email protected] 写道: > > > >>>>>>>>> > > > >>>>>>>>> `assignTimestampsAndWatermarks` before `keyBy` works: > > > >>>>>>>>> ```java > > > >>>>>>>>> DataStream<Trip> trips = > > > >>>>>>>>> > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > >>>>>>>>> @Override > > > >>>>>>>>> public long extractTimestamp(Trip trip) { > > > >>>>>>>>> return trip.endTime.getTime(); > > > >>>>>>>>> } > > > >>>>>>>>> }); > > > >>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > > >>>>> trip.userId); > > > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > > > >>>>> userTrips.process(new > > > >>>>>>> Featurization()); > > > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> > > windowedUserTrips = > > > >>>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > >>>>>>>>> Time.days(1)); > > > >>>>>>>>> ``` > > > >>>>>>>>> > > > >>>>>>>>> But not after `keyBy` and `process`: > > > >>>>>>>>> ```java > > > >>>>>>>>> DataStream<Trip> trips = env.addSource(consumer); > > > >>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > > >>>>> trip.userId); > > > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > > > >>>>>>>>> userTrips.process(new > > > >>>>>>> Featurization()).assignTimestampsAndWatermarks(new > > > >>>>>>> > > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { > > > >>>>>>>>> @Override > > > >>>>>>>>> public long extractTimestamp(FeaturizedTrip trip) { > > > >>>>>>>>> return trip.endTime.getTime(); > > > >>>>>>>>> } > > > >>>>>>>>> }); > > > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> > > windowedUserTrips = > > > >>>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > >>>>>>>>> Time.days(1)); > > > >>>>>>>>> ``` > > > >>>>>>>>> Windows are never triggered. > > > >>>>>>>>> > > > >>>>>>>>> Is it a bug or expected behavior? If the latter, where is it > > > >>>>>>> documented? > > > >> > > > > > > > > >
