Thanks, but it does't seem covering this rule: --- Quote Watermarks are generated at, or directly after, source functions. Each parallel subtask of a source function usually generates its watermarks independently. These watermarks define the event time at that particular parallel source.
As the watermarks flow through the streaming program, they advance the event time at the operators where they arrive. Whenever an operator advances its event time, it generates a new watermark downstream for its successor operators. Some operators consume multiple input streams; a union, for example, or operators following a keyBy(…) or partition(…) function. Such an operator’s current event time is the minimum of its input streams’ event times. As its input streams update their event times, so does the operator. --- End Quote The most relevant part, I believe, is this: "Some operators consume multiple input streams…operators following a keyBy(…) function. Such an operator’s current event time is the minimum of its input streams’ event times." But the wording of "current event time is the minimum of its input streams’ event times" actually implies that the input streams(produced by keyBy) have different watermarks, the exactly opposite of what you just explained. On 2019/05/03 07:32:07, Fabian Hueske <fhue...@gmail.com> wrote: > Hi, > > this should be covered here: > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#watermarks-in-parallel-streams > > Best, Fabian > > Am Do., 2. Mai 2019 um 17:48 Uhr schrieb an0 <an0...@gmail.com>: > > > This explanation is exactly what I'm looking for, thanks! Is such an > > important rule documented anywhere in the official document? > > > > On 2019/04/30 08:47:29, Fabian Hueske <fhue...@gmail.com> wrote: > > > An operator task broadcasts its current watermark to all downstream tasks > > > that might receive its records. > > > If you have an the following code: > > > > > > DataStream<X> a = ... > > > a.map(A).map(B).keyBy(....).window(C) > > > > > > and execute this with parallelism 2, your plan looks like this > > > > > > A.1 -- B.1 --\--/-- C.1 > > > X > > > A.2 -- B.2 --/--\-- C.2 > > > > > > A.1 will propagate its watermarks to B.1 because only B.1 will receive > > its > > > output events. > > > However, B.1 will propagate its watermarks to C.1 and C.2 because the > > > output of B.1 is partitioned and all C tasks might receive output events > > > from B.1. > > > > > > Best, Fabian > > > > > > Am Mo., 29. Apr. 2019 um 20:06 Uhr schrieb an0 <an0...@gmail.com>: > > > > > > > Thanks very much. It definitely explains the problem I'm seeing. > > However, > > > > something I need to confirm: > > > > You say "Watermarks are broadcasted/forwarded anyway." Do you mean, in > > > > assingTimestampsAndWatermarks.keyBy.window, it doesn't matter what data > > > > flows through a specific key's stream, all key streams have the same > > > > watermarks? So time-wise, `window` behaves as if `keyBy` is not there > > at > > > > all? > > > > > > > > On 2019/04/26 06:34:10, Dawid Wysakowicz <dwysakow...@apache.org> > > wrote: > > > > > Hi, > > > > > > > > > > Watermarks are meta events that travel independently of data events. > > > > > > > > > > 1) If you assingTimestampsAndWatermarks before keyBy, all parallel > > > > > instances of trips have some data(this is my assumption) so > > Watermarks > > > > > can be generated. Afterwards even if some of the keyed partitions > > have > > > > > no data, Watermarks are broadcasted/forwarded anyway. In other words > > if > > > > > at some point Watermarks were generated for all partitions of a > > single > > > > > stage, they will be forwarded beyond this point. > > > > > > > > > > 2) If you assingTimestampsAndWatermarks after keyBy, you try to > > assign > > > > > watermarks for an empty partition which produces no Watermarks at all > > > > > for this partition, therefore there is no progress beyond this point. > > > > > > > > > > I hope this clarifies it a bit. > > > > > > > > > > Best, > > > > > > > > > > Dawid > > > > > > > > > > On 25/04/2019 16:49, an0 wrote: > > > > > > If my understanding is correct, then why > > > > `assignTimestampsAndWatermarks` before `keyBy` works? The > > `timeWindowAll` > > > > stream's input streams are task 1 and task 2, with task 2 idling, no > > matter > > > > whether `assignTimestampsAndWatermarks` is before or after `keyBy`, > > because > > > > whether task 2 receives elements only depends on the key distribution, > > has > > > > nothing to do with timestamp assignment, right? > > > > > > > > > > > > > > > > /key 1 trips\ > > > > > > > > > > / \ > > > > > > (A) trips--> assignTimestampsAndWatermarks-->keyBy > > > > timeWindowAll > > > > > > > > > > \ idle / > > > > > > > > > > \key 2 trips/ > > > > > > > > > > > > /key 1 trips--> > > > > assignTimestampsAndWatermarks\ > > > > > > / > > > > \ > > > > > > (B) trips-->keyBy > > > > timeWindowAll > > > > > > \ idle > > > > / > > > > > > \key 2 trips--> > > > > assignTimestampsAndWatermarks/ > > > > > > > > > > > > How things are different between A and B from `timeWindowAll`'s > > > > perspective? > > > > > > > > > > > > BTW, thanks for the webinar link, I'll check it later. > > > > > > > > > > > > On 2019/04/25 08:30:20, Dawid Wysakowicz <dwysakow...@apache.org> > > > > wrote: > > > > > >> Hi, > > > > > >> > > > > > >> Yes I think your explanation is correct. I can also recommend > > Seth's > > > > > >> webinar where he talks about debugging Watermarks[1] > > > > > >> > > > > > >> Best, > > > > > >> > > > > > >> Dawid > > > > > >> > > > > > >> [1] > > > > > >> > > > > > > https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial > > > > > >> > > > > > >> On 22/04/2019 22:55, an0 wrote: > > > > > >>> Thanks, I feel I'm getting closer to the truth. > > > > > >>> > > > > > >>> So parallelism is the cause? Say my parallelism is 2. Does that > > mean > > > > I get 2 tasks running after `keyBy` if even all elements have the same > > key > > > > so go to 1 down stream(say task 1)? And it is the other task(task 2) > > with > > > > no incoming data that caused the `timeWindowAll` stream unable to > > progress? > > > > Because both task 1 and task 2 are its input streams and one is idling > > so > > > > its event time cannot make progress? > > > > > >>> > > > > > >>> On 2019/04/22 01:57:39, Guowei Ma <guowei....@gmail.com> wrote: > > > > > >>>> HI, > > > > > >>>> > > > > > >>>> BoundedOutOfOrdernessTimestampExtractors can send a WM at least > > > > after it > > > > > >>>> receives an element. > > > > > >>>> > > > > > >>>> For after Keyby: > > > > > >>>> Flink uses the HashCode of key and the parallelism of down > > stream > > > > to decide > > > > > >>>> which subtask would receive the element. This means if your key > > is > > > > always > > > > > >>>> same, all the sources will only send the elements to the same > > down > > > > stream > > > > > >>>> task, for example only no. 3 > > > > BoundedOutOfOrdernessTimestampExtractor. > > > > > >>>> > > > > > >>>> For before Keyby: > > > > > >>>> In your case, the Source and > > > > BoundedOutOfOrdernessTimestampExtractors would > > > > > >>>> be chained together, which means every > > > > > >>>> BoundedOutOfOrdernessTimestampExtractors will receive elements. > > > > > >>>> > > > > > >>>> Best, > > > > > >>>> Guowei > > > > > >>>> > > > > > >>>> > > > > > >>>> an0 <an0...@gmail.com> 于2019年4月19日周五 下午10:41写道: > > > > > >>>> > > > > > >>>>> Hi, > > > > > >>>>> > > > > > >>>>> First of all, thank you for the `shuffle()` tip. It works. > > > > However, I > > > > > >>>>> still don't understand why it doesn't work without calling > > > > `shuffle()`. > > > > > >>>>> > > > > > >>>>> Why would not all BoundedOutOfOrdernessTimestampExtractors > > receive > > > > trips? > > > > > >>>>> All the trips has keys and timestamps. As I said in my reply to > > > > Paul, I see > > > > > >>>>> the same watermarks being extracted. > > > > > >>>>> > > > > > >>>>> How could calling `assignTimestampsAndWatermarks` before VS > > after > > > > `keyBy` > > > > > >>>>> matter? My understanding is any specific window for a specific > > key > > > > always > > > > > >>>>> receives the exactly same data, and the calling order of > > > > > >>>>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect > > that. > > > > > >>>>> > > > > > >>>>> To make `keyBy` as irrelevant as possible, I tried letting it > > > > always > > > > > >>>>> return the same key so that there is only 1 keyed stream and > > it is > > > > exactly > > > > > >>>>> the same as the original unkeyed stream. It still doesn't > > trigger > > > > windows: > > > > > >>>>> ```java > > > > > >>>>> DataStream<Trip> trips = env.addSource(consumer); > > > > > >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L); > > > > > >>>>> DataStream<Trip> featurizedUserTrips = > > > > > >>>>> userTrips.map(trip -> > > > > trip).assignTimestampsAndWatermarks(new > > > > > >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > > >>>>> @Override > > > > > >>>>> public long extractTimestamp(Trip trip) { > > > > > >>>>> return trip.endTime.getTime(); > > > > > >>>>> } > > > > > >>>>> }); > > > > > >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > > > >>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > >>>>> Time.days(1)); > > > > > >>>>> ``` > > > > > >>>>> > > > > > >>>>> It makes no sense to me. Please help me understand why it > > doesn't > > > > work. > > > > > >>>>> Thanks! > > > > > >>>>> > > > > > >>>>> On 2019/04/19 04:14:31, Guowei Ma <guowei....@gmail.com> > > wrote: > > > > > >>>>>> Hi, > > > > > >>>>>> After keyby maybe only some of > > > > BoundedOutOfOrdernessTimestampExtractors > > > > > >>>>>> could receive the elements(trip). If that is the case > > > > > >>>>>> BoundedOutOfOrdernessTimestampExtractor, which does not > > receive > > > > element > > > > > >>>>>> would not send the WM. Since that the timeWindowAll operator > > > > could not be > > > > > >>>>>> triggered. > > > > > >>>>>> You could add a shuffle() before the > > > > assignTimestampsAndWatermarks in > > > > > >>>>> your > > > > > >>>>>> second case and check if the window is triggered. If it > > could be > > > > > >>>>> triggered > > > > > >>>>>> you could check the distribution of elements generated by the > > > > source. > > > > > >>>>>> > > > > > >>>>>> Best, > > > > > >>>>>> Guowei > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>> an0...@gmail.com <an0...@gmail.com> 于2019年4月19日周五 上午4:10写道: > > > > > >>>>>> > > > > > >>>>>>> I don't think it is the watermark. I see the same watermarks > > > > from the > > > > > >>>>> two > > > > > >>>>>>> versions of code. > > > > > >>>>>>> > > > > > >>>>>>> The processing on the keyed stream doesn't change event time > > at > > > > all. I > > > > > >>>>> can > > > > > >>>>>>> simply change my code to use `map` on the keyed stream to > > return > > > > back > > > > > >>>>> the > > > > > >>>>>>> input data, so that the window operator receives the exactly > > same > > > > > >>>>> data. The > > > > > >>>>>>> only difference is when I do > > `assignTimestampsAndWatermarks`. The > > > > > >>>>> result is > > > > > >>>>>>> the same, `assignTimestampsAndWatermarks` before `keyBy` > > works: > > > > > >>>>>>> ```java > > > > > >>>>>>> DataStream<Trip> trips = > > > > > >>>>>>> > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > > > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > > >>>>>>> @Override > > > > > >>>>>>> public long extractTimestamp(Trip trip) { > > > > > >>>>>>> return trip.endTime.getTime(); > > > > > >>>>>>> } > > > > > >>>>>>> }); > > > > > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > > > trip.userId); > > > > > >>>>>>> DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> > > > > trip); > > > > > >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > >>>>>>> Time.days(1)); > > > > > >>>>>>> ``` > > > > > >>>>>>> > > > > > >>>>>>> `assignTimestampsAndWatermarks` after `keyBy` doesn't work: > > > > > >>>>>>> ```java > > > > > >>>>>>> DataStream<Trip> trips = env.addSource(consumer); > > > > > >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > > > trip.userId); > > > > > >>>>>>> DataStream<Trip> featurizedUserTrips = > > > > > >>>>>>> userTrips.map(trip -> > > > > trip).assignTimestampsAndWatermarks(new > > > > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > > >>>>>>> @Override > > > > > >>>>>>> public long extractTimestamp(Trip trip) { > > > > > >>>>>>> return trip.endTime.getTime(); > > > > > >>>>>>> } > > > > > >>>>>>> }); > > > > > >>>>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > > > >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > >>>>>>> Time.days(1)); > > > > > >>>>>>> ``` > > > > > >>>>>>> > > > > > >>>>>>> It feels a bug to me, but I want to confirm it before I file > > the > > > > bug > > > > > >>>>>>> report. > > > > > >>>>>>> > > > > > >>>>>>> On 2019/04/18 03:38:34, Paul Lam <paullin3...@gmail.com> > > wrote: > > > > > >>>>>>>> Hi, > > > > > >>>>>>>> > > > > > >>>>>>>> Could you check the watermark of the window operator? One > > > > possible > > > > > >>>>>>> situation would be some of the keys are not getting enough > > > > inputs, so > > > > > >>>>> their > > > > > >>>>>>> watermarks remain below the window end time and hold the > > window > > > > > >>>>> operator > > > > > >>>>>>> watermark back. IMO, it’s a good practice to assign watermark > > > > earlier > > > > > >>>>> in > > > > > >>>>>>> the data pipeline. > > > > > >>>>>>>> Best, > > > > > >>>>>>>> Paul Lam > > > > > >>>>>>>> > > > > > >>>>>>>>> 在 2019年4月17日,23:04,an0...@gmail.com 写道: > > > > > >>>>>>>>> > > > > > >>>>>>>>> `assignTimestampsAndWatermarks` before `keyBy` works: > > > > > >>>>>>>>> ```java > > > > > >>>>>>>>> DataStream<Trip> trips = > > > > > >>>>>>>>> > > > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > > > > >>>>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > > >>>>>>>>> @Override > > > > > >>>>>>>>> public long extractTimestamp(Trip trip) { > > > > > >>>>>>>>> return trip.endTime.getTime(); > > > > > >>>>>>>>> } > > > > > >>>>>>>>> }); > > > > > >>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > > > > >>>>> trip.userId); > > > > > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > > > > > >>>>> userTrips.process(new > > > > > >>>>>>> Featurization()); > > > > > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> > > > > windowedUserTrips = > > > > > >>>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > >>>>>>>>> Time.days(1)); > > > > > >>>>>>>>> ``` > > > > > >>>>>>>>> > > > > > >>>>>>>>> But not after `keyBy` and `process`: > > > > > >>>>>>>>> ```java > > > > > >>>>>>>>> DataStream<Trip> trips = env.addSource(consumer); > > > > > >>>>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > > > > >>>>> trip.userId); > > > > > >>>>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = > > > > > >>>>>>>>> userTrips.process(new > > > > > >>>>>>> Featurization()).assignTimestampsAndWatermarks(new > > > > > >>>>>>> > > > > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { > > > > > >>>>>>>>> @Override > > > > > >>>>>>>>> public long extractTimestamp(FeaturizedTrip > > trip) { > > > > > >>>>>>>>> return trip.endTime.getTime(); > > > > > >>>>>>>>> } > > > > > >>>>>>>>> }); > > > > > >>>>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> > > > > windowedUserTrips = > > > > > >>>>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > >>>>>>>>> Time.days(1)); > > > > > >>>>>>>>> ``` > > > > > >>>>>>>>> Windows are never triggered. > > > > > >>>>>>>>> > > > > > >>>>>>>>> Is it a bug or expected behavior? If the latter, where is > > it > > > > > >>>>>>> documented? > > > > > >> > > > > > > > > > > > > > > > > > > > >