If my understanding is correct, then why `assignTimestampsAndWatermarks` before
`keyBy` works? The `timeWindowAll` stream's input streams are task 1 and task
2, with task 2 idling, no matter whether `assignTimestampsAndWatermarks` is
before or after `keyBy`, because whether task 2 receives elements only depends
on the key distribution, has nothing to do with timestamp assignment, right?
/key 1 trips\
/ \
(A) trips--> assignTimestampsAndWatermarks-->keyBy
timeWindowAll
\ idle /
\key 2 trips/
/key 1 trips--> assignTimestampsAndWatermarks\
/
\
(B) trips-->keyBy
timeWindowAll
\ idle
/
\key 2 trips--> assignTimestampsAndWatermarks/
How things are different between A and B from `timeWindowAll`'s perspective?
BTW, thanks for the webinar link, I'll check it later.
On 2019/04/25 08:30:20, Dawid Wysakowicz <[email protected]> wrote:
> Hi,
>
> Yes I think your explanation is correct. I can also recommend Seth's
> webinar where he talks about debugging Watermarks[1]
>
> Best,
>
> Dawid
>
> [1]
> https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial
>
> On 22/04/2019 22:55, an0 wrote:
> > Thanks, I feel I'm getting closer to the truth.
> >
> > So parallelism is the cause? Say my parallelism is 2. Does that mean I get
> > 2 tasks running after `keyBy` if even all elements have the same key so go
> > to 1 down stream(say task 1)? And it is the other task(task 2) with no
> > incoming data that caused the `timeWindowAll` stream unable to progress?
> > Because both task 1 and task 2 are its input streams and one is idling so
> > its event time cannot make progress?
> >
> > On 2019/04/22 01:57:39, Guowei Ma <[email protected]> wrote:
> >> HI,
> >>
> >> BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it
> >> receives an element.
> >>
> >> For after Keyby:
> >> Flink uses the HashCode of key and the parallelism of down stream to decide
> >> which subtask would receive the element. This means if your key is always
> >> same, all the sources will only send the elements to the same down stream
> >> task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor.
> >>
> >> For before Keyby:
> >> In your case, the Source and BoundedOutOfOrdernessTimestampExtractors would
> >> be chained together, which means every
> >> BoundedOutOfOrdernessTimestampExtractors will receive elements.
> >>
> >> Best,
> >> Guowei
> >>
> >>
> >> an0 <[email protected]> 于2019年4月19日周五 下午10:41写道:
> >>
> >>> Hi,
> >>>
> >>> First of all, thank you for the `shuffle()` tip. It works. However, I
> >>> still don't understand why it doesn't work without calling `shuffle()`.
> >>>
> >>> Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips?
> >>> All the trips has keys and timestamps. As I said in my reply to Paul, I
> >>> see
> >>> the same watermarks being extracted.
> >>>
> >>> How could calling `assignTimestampsAndWatermarks` before VS after `keyBy`
> >>> matter? My understanding is any specific window for a specific key always
> >>> receives the exactly same data, and the calling order of
> >>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that.
> >>>
> >>> To make `keyBy` as irrelevant as possible, I tried letting it always
> >>> return the same key so that there is only 1 keyed stream and it is exactly
> >>> the same as the original unkeyed stream. It still doesn't trigger windows:
> >>> ```java
> >>> DataStream<Trip> trips = env.addSource(consumer);
> >>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L);
> >>> DataStream<Trip> featurizedUserTrips =
> >>> userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
> >>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> >>> @Override
> >>> public long extractTimestamp(Trip trip) {
> >>> return trip.endTime.getTime();
> >>> }
> >>> });
> >>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> >>> featurizedUserTrips.timeWindowAll(Time.days(7),
> >>> Time.days(1));
> >>> ```
> >>>
> >>> It makes no sense to me. Please help me understand why it doesn't work.
> >>> Thanks!
> >>>
> >>> On 2019/04/19 04:14:31, Guowei Ma <[email protected]> wrote:
> >>>> Hi,
> >>>> After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors
> >>>> could receive the elements(trip). If that is the case
> >>>> BoundedOutOfOrdernessTimestampExtractor, which does not receive element
> >>>> would not send the WM. Since that the timeWindowAll operator could not be
> >>>> triggered.
> >>>> You could add a shuffle() before the assignTimestampsAndWatermarks in
> >>> your
> >>>> second case and check if the window is triggered. If it could be
> >>> triggered
> >>>> you could check the distribution of elements generated by the source.
> >>>>
> >>>> Best,
> >>>> Guowei
> >>>>
> >>>>
> >>>> [email protected] <[email protected]> 于2019年4月19日周五 上午4:10写道:
> >>>>
> >>>>> I don't think it is the watermark. I see the same watermarks from the
> >>> two
> >>>>> versions of code.
> >>>>>
> >>>>> The processing on the keyed stream doesn't change event time at all. I
> >>> can
> >>>>> simply change my code to use `map` on the keyed stream to return back
> >>> the
> >>>>> input data, so that the window operator receives the exactly same
> >>> data. The
> >>>>> only difference is when I do `assignTimestampsAndWatermarks`. The
> >>> result is
> >>>>> the same, `assignTimestampsAndWatermarks` before `keyBy` works:
> >>>>> ```java
> >>>>> DataStream<Trip> trips =
> >>>>> env.addSource(consumer).assignTimestampsAndWatermarks(new
> >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> >>>>> @Override
> >>>>> public long extractTimestamp(Trip trip) {
> >>>>> return trip.endTime.getTime();
> >>>>> }
> >>>>> });
> >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> >>>>> DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip);
> >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> >>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> >>>>> Time.days(1));
> >>>>> ```
> >>>>>
> >>>>> `assignTimestampsAndWatermarks` after `keyBy` doesn't work:
> >>>>> ```java
> >>>>> DataStream<Trip> trips = env.addSource(consumer);
> >>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> >>>>> DataStream<Trip> featurizedUserTrips =
> >>>>> userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
> >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> >>>>> @Override
> >>>>> public long extractTimestamp(Trip trip) {
> >>>>> return trip.endTime.getTime();
> >>>>> }
> >>>>> });
> >>>>> AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> >>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> >>>>> Time.days(1));
> >>>>> ```
> >>>>>
> >>>>> It feels a bug to me, but I want to confirm it before I file the bug
> >>>>> report.
> >>>>>
> >>>>> On 2019/04/18 03:38:34, Paul Lam <[email protected]> wrote:
> >>>>>> Hi,
> >>>>>>
> >>>>>> Could you check the watermark of the window operator? One possible
> >>>>> situation would be some of the keys are not getting enough inputs, so
> >>> their
> >>>>> watermarks remain below the window end time and hold the window
> >>> operator
> >>>>> watermark back. IMO, it’s a good practice to assign watermark earlier
> >>> in
> >>>>> the data pipeline.
> >>>>>> Best,
> >>>>>> Paul Lam
> >>>>>>
> >>>>>>> 在 2019年4月17日,23:04,[email protected] 写道:
> >>>>>>>
> >>>>>>> `assignTimestampsAndWatermarks` before `keyBy` works:
> >>>>>>> ```java
> >>>>>>> DataStream<Trip> trips =
> >>>>>>> env.addSource(consumer).assignTimestampsAndWatermarks(new
> >>>>> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> >>>>>>> @Override
> >>>>>>> public long extractTimestamp(Trip trip) {
> >>>>>>> return trip.endTime.getTime();
> >>>>>>> }
> >>>>>>> });
> >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip ->
> >>> trip.userId);
> >>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips =
> >>> userTrips.process(new
> >>>>> Featurization());
> >>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
> >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> >>>>>>> Time.days(1));
> >>>>>>> ```
> >>>>>>>
> >>>>>>> But not after `keyBy` and `process`:
> >>>>>>> ```java
> >>>>>>> DataStream<Trip> trips = env.addSource(consumer);
> >>>>>>> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip ->
> >>> trip.userId);
> >>>>>>> DataStream<FeaturizedTrip> featurizedUserTrips =
> >>>>>>> userTrips.process(new
> >>>>> Featurization()).assignTimestampsAndWatermarks(new
> >>>>> BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) {
> >>>>>>> @Override
> >>>>>>> public long extractTimestamp(FeaturizedTrip trip) {
> >>>>>>> return trip.endTime.getTime();
> >>>>>>> }
> >>>>>>> });
> >>>>>>> AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
> >>>>>>> featurizedUserTrips.timeWindowAll(Time.days(7),
> >>>>>>> Time.days(1));
> >>>>>>> ```
> >>>>>>> Windows are never triggered.
> >>>>>>>
> >>>>>>> Is it a bug or expected behavior? If the latter, where is it
> >>>>> documented?
> >>>>>>
>
>