Thanks, I feel I'm getting closer to the truth. So parallelism is the cause? Say my parallelism is 2. Does that mean I get 2 tasks running after `keyBy` if even all elements have the same key so go to 1 down stream(say task 1)? And it is the other task(task 2) with no incoming data that caused the `timeWindowAll` stream unable to progress? Because both task 1 and task 2 are its input streams and one is idling so its event time cannot make progress?
On 2019/04/22 01:57:39, Guowei Ma <guowei....@gmail.com> wrote: > HI, > > BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it > receives an element. > > For after Keyby: > Flink uses the HashCode of key and the parallelism of down stream to decide > which subtask would receive the element. This means if your key is always > same, all the sources will only send the elements to the same down stream > task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor. > > For before Keyby: > In your case, the Source and BoundedOutOfOrdernessTimestampExtractors would > be chained together, which means every > BoundedOutOfOrdernessTimestampExtractors will receive elements. > > Best, > Guowei > > > an0 <an0...@gmail.com> 于2019年4月19日周五 下午10:41写道: > > > Hi, > > > > First of all, thank you for the `shuffle()` tip. It works. However, I > > still don't understand why it doesn't work without calling `shuffle()`. > > > > Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips? > > All the trips has keys and timestamps. As I said in my reply to Paul, I see > > the same watermarks being extracted. > > > > How could calling `assignTimestampsAndWatermarks` before VS after `keyBy` > > matter? My understanding is any specific window for a specific key always > > receives the exactly same data, and the calling order of > > `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that. > > > > To make `keyBy` as irrelevant as possible, I tried letting it always > > return the same key so that there is only 1 keyed stream and it is exactly > > the same as the original unkeyed stream. It still doesn't trigger windows: > > ```java > > DataStream<Trip> trips = env.addSource(consumer); > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L); > > DataStream<Trip> featurizedUserTrips = > > userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > @Override > > public long extractTimestamp(Trip trip) { > > return trip.endTime.getTime(); > > } > > }); > > AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > featurizedUserTrips.timeWindowAll(Time.days(7), > > Time.days(1)); > > ``` > > > > It makes no sense to me. Please help me understand why it doesn't work. > > Thanks! > > > > On 2019/04/19 04:14:31, Guowei Ma <guowei....@gmail.com> wrote: > > > Hi, > > > After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors > > > could receive the elements(trip). If that is the case > > > BoundedOutOfOrdernessTimestampExtractor, which does not receive element > > > would not send the WM. Since that the timeWindowAll operator could not be > > > triggered. > > > You could add a shuffle() before the assignTimestampsAndWatermarks in > > your > > > second case and check if the window is triggered. If it could be > > triggered > > > you could check the distribution of elements generated by the source. > > > > > > Best, > > > Guowei > > > > > > > > > an0...@gmail.com <an0...@gmail.com> 于2019年4月19日周五 上午4:10写道: > > > > > > > I don't think it is the watermark. I see the same watermarks from the > > two > > > > versions of code. > > > > > > > > The processing on the keyed stream doesn't change event time at all. I > > can > > > > simply change my code to use `map` on the keyed stream to return back > > the > > > > input data, so that the window operator receives the exactly same > > data. The > > > > only difference is when I do `assignTimestampsAndWatermarks`. The > > result is > > > > the same, `assignTimestampsAndWatermarks` before `keyBy` works: > > > > ```java > > > > DataStream<Trip> trips = > > > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > @Override > > > > public long extractTimestamp(Trip trip) { > > > > return trip.endTime.getTime(); > > > > } > > > > }); > > > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > > > > DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip); > > > > AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > > featurizedUserTrips.timeWindowAll(Time.days(7), > > > > Time.days(1)); > > > > ``` > > > > > > > > `assignTimestampsAndWatermarks` after `keyBy` doesn't work: > > > > ```java > > > > DataStream<Trip> trips = env.addSource(consumer); > > > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > > > > DataStream<Trip> featurizedUserTrips = > > > > userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new > > > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > @Override > > > > public long extractTimestamp(Trip trip) { > > > > return trip.endTime.getTime(); > > > > } > > > > }); > > > > AllWindowedStream<Trip, TimeWindow> windowedUserTrips = > > > > featurizedUserTrips.timeWindowAll(Time.days(7), > > > > Time.days(1)); > > > > ``` > > > > > > > > It feels a bug to me, but I want to confirm it before I file the bug > > > > report. > > > > > > > > On 2019/04/18 03:38:34, Paul Lam <paullin3...@gmail.com> wrote: > > > > > Hi, > > > > > > > > > > Could you check the watermark of the window operator? One possible > > > > situation would be some of the keys are not getting enough inputs, so > > their > > > > watermarks remain below the window end time and hold the window > > operator > > > > watermark back. IMO, it’s a good practice to assign watermark earlier > > in > > > > the data pipeline. > > > > > > > > > > Best, > > > > > Paul Lam > > > > > > > > > > > 在 2019年4月17日,23:04,an0...@gmail.com 写道: > > > > > > > > > > > > `assignTimestampsAndWatermarks` before `keyBy` works: > > > > > > ```java > > > > > > DataStream<Trip> trips = > > > > > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > > > > > @Override > > > > > > public long extractTimestamp(Trip trip) { > > > > > > return trip.endTime.getTime(); > > > > > > } > > > > > > }); > > > > > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > trip.userId); > > > > > > DataStream<FeaturizedTrip> featurizedUserTrips = > > userTrips.process(new > > > > Featurization()); > > > > > > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > > > > > > featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > > Time.days(1)); > > > > > > ``` > > > > > > > > > > > > But not after `keyBy` and `process`: > > > > > > ```java > > > > > > DataStream<Trip> trips = env.addSource(consumer); > > > > > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> > > trip.userId); > > > > > > DataStream<FeaturizedTrip> featurizedUserTrips = > > > > > > userTrips.process(new > > > > Featurization()).assignTimestampsAndWatermarks(new > > > > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { > > > > > > @Override > > > > > > public long extractTimestamp(FeaturizedTrip trip) { > > > > > > return trip.endTime.getTime(); > > > > > > } > > > > > > }); > > > > > > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > > > > > > featurizedUserTrips.timeWindowAll(Time.days(7), > > > > > > Time.days(1)); > > > > > > ``` > > > > > > Windows are never triggered. > > > > > > > > > > > > Is it a bug or expected behavior? If the latter, where is it > > > > documented? > > > > > > > > > > > > > > > > > > > > > > > > > >