Hi, Yes I think your explanation is correct. I can also recommend Seth's webinar where he talks about debugging Watermarks[1]
Best, Dawid [1] https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial On 22/04/2019 22:55, an0 wrote: > Thanks, I feel I'm getting closer to the truth. > > So parallelism is the cause? Say my parallelism is 2. Does that mean I get 2 > tasks running after `keyBy` if even all elements have the same key so go to 1 > down stream(say task 1)? And it is the other task(task 2) with no incoming > data that caused the `timeWindowAll` stream unable to progress? Because both > task 1 and task 2 are its input streams and one is idling so its event time > cannot make progress? > > On 2019/04/22 01:57:39, Guowei Ma <guowei....@gmail.com> wrote: >> HI, >> >> BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it >> receives an element. >> >> For after Keyby: >> Flink uses the HashCode of key and the parallelism of down stream to decide >> which subtask would receive the element. This means if your key is always >> same, all the sources will only send the elements to the same down stream >> task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor. >> >> For before Keyby: >> In your case, the Source and BoundedOutOfOrdernessTimestampExtractors would >> be chained together, which means every >> BoundedOutOfOrdernessTimestampExtractors will receive elements. >> >> Best, >> Guowei >> >> >> an0 <an0...@gmail.com> 于2019年4月19日周五 下午10:41写道: >> >>> Hi, >>> >>> First of all, thank you for the `shuffle()` tip. It works. However, I >>> still don't understand why it doesn't work without calling `shuffle()`. >>> >>> Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips? >>> All the trips has keys and timestamps. As I said in my reply to Paul, I see >>> the same watermarks being extracted. >>> >>> How could calling `assignTimestampsAndWatermarks` before VS after `keyBy` >>> matter? My understanding is any specific window for a specific key always >>> receives the exactly same data, and the calling order of >>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that. >>> >>> To make `keyBy` as irrelevant as possible, I tried letting it always >>> return the same key so that there is only 1 keyed stream and it is exactly >>> the same as the original unkeyed stream. It still doesn't trigger windows: >>> ```java >>> DataStream<Trip> trips = env.addSource(consumer); >>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L); >>> DataStream<Trip> featurizedUserTrips = >>> userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new >>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { >>> @Override >>> public long extractTimestamp(Trip trip) { >>> return trip.endTime.getTime(); >>> } >>> }); >>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = >>> featurizedUserTrips.timeWindowAll(Time.days(7), >>> Time.days(1)); >>> ``` >>> >>> It makes no sense to me. Please help me understand why it doesn't work. >>> Thanks! >>> >>> On 2019/04/19 04:14:31, Guowei Ma <guowei....@gmail.com> wrote: >>>> Hi, >>>> After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors >>>> could receive the elements(trip). If that is the case >>>> BoundedOutOfOrdernessTimestampExtractor, which does not receive element >>>> would not send the WM. Since that the timeWindowAll operator could not be >>>> triggered. >>>> You could add a shuffle() before the assignTimestampsAndWatermarks in >>> your >>>> second case and check if the window is triggered. If it could be >>> triggered >>>> you could check the distribution of elements generated by the source. >>>> >>>> Best, >>>> Guowei >>>> >>>> >>>> an0...@gmail.com <an0...@gmail.com> 于2019年4月19日周五 上午4:10写道: >>>> >>>>> I don't think it is the watermark. I see the same watermarks from the >>> two >>>>> versions of code. >>>>> >>>>> The processing on the keyed stream doesn't change event time at all. I >>> can >>>>> simply change my code to use `map` on the keyed stream to return back >>> the >>>>> input data, so that the window operator receives the exactly same >>> data. The >>>>> only difference is when I do `assignTimestampsAndWatermarks`. The >>> result is >>>>> the same, `assignTimestampsAndWatermarks` before `keyBy` works: >>>>> ```java >>>>> DataStream<Trip> trips = >>>>> env.addSource(consumer).assignTimestampsAndWatermarks(new >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { >>>>> @Override >>>>> public long extractTimestamp(Trip trip) { >>>>> return trip.endTime.getTime(); >>>>> } >>>>> }); >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); >>>>> DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip); >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = >>>>> featurizedUserTrips.timeWindowAll(Time.days(7), >>>>> Time.days(1)); >>>>> ``` >>>>> >>>>> `assignTimestampsAndWatermarks` after `keyBy` doesn't work: >>>>> ```java >>>>> DataStream<Trip> trips = env.addSource(consumer); >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); >>>>> DataStream<Trip> featurizedUserTrips = >>>>> userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { >>>>> @Override >>>>> public long extractTimestamp(Trip trip) { >>>>> return trip.endTime.getTime(); >>>>> } >>>>> }); >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips = >>>>> featurizedUserTrips.timeWindowAll(Time.days(7), >>>>> Time.days(1)); >>>>> ``` >>>>> >>>>> It feels a bug to me, but I want to confirm it before I file the bug >>>>> report. >>>>> >>>>> On 2019/04/18 03:38:34, Paul Lam <paullin3...@gmail.com> wrote: >>>>>> Hi, >>>>>> >>>>>> Could you check the watermark of the window operator? One possible >>>>> situation would be some of the keys are not getting enough inputs, so >>> their >>>>> watermarks remain below the window end time and hold the window >>> operator >>>>> watermark back. IMO, it’s a good practice to assign watermark earlier >>> in >>>>> the data pipeline. >>>>>> Best, >>>>>> Paul Lam >>>>>> >>>>>>> 在 2019年4月17日,23:04,an0...@gmail.com 写道: >>>>>>> >>>>>>> `assignTimestampsAndWatermarks` before `keyBy` works: >>>>>>> ```java >>>>>>> DataStream<Trip> trips = >>>>>>> env.addSource(consumer).assignTimestampsAndWatermarks(new >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { >>>>>>> @Override >>>>>>> public long extractTimestamp(Trip trip) { >>>>>>> return trip.endTime.getTime(); >>>>>>> } >>>>>>> }); >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> >>> trip.userId); >>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = >>> userTrips.process(new >>>>> Featurization()); >>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), >>>>>>> Time.days(1)); >>>>>>> ``` >>>>>>> >>>>>>> But not after `keyBy` and `process`: >>>>>>> ```java >>>>>>> DataStream<Trip> trips = env.addSource(consumer); >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> >>> trip.userId); >>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips = >>>>>>> userTrips.process(new >>>>> Featurization()).assignTimestampsAndWatermarks(new >>>>> BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { >>>>>>> @Override >>>>>>> public long extractTimestamp(FeaturizedTrip trip) { >>>>>>> return trip.endTime.getTime(); >>>>>>> } >>>>>>> }); >>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7), >>>>>>> Time.days(1)); >>>>>>> ``` >>>>>>> Windows are never triggered. >>>>>>> >>>>>>> Is it a bug or expected behavior? If the latter, where is it >>>>> documented? >>>>>>
signature.asc
Description: OpenPGP digital signature