I don't think it is the watermark. I see the same watermarks from the two versions of code.
The processing on the keyed stream doesn't change event time at all. I can simply change my code to use `map` on the keyed stream to return back the input data, so that the window operator receives the exactly same data. The only difference is when I do `assignTimestampsAndWatermarks`. The result is the same, `assignTimestampsAndWatermarks` before `keyBy` works: ```java DataStream<Trip> trips = env.addSource(consumer).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { @Override public long extractTimestamp(Trip trip) { return trip.endTime.getTime(); } }); KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip); AllWindowedStream<Trip, TimeWindow> windowedUserTrips = featurizedUserTrips.timeWindowAll(Time.days(7), Time.days(1)); ``` `assignTimestampsAndWatermarks` after `keyBy` doesn't work: ```java DataStream<Trip> trips = env.addSource(consumer); KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { @Override public long extractTimestamp(Trip trip) { return trip.endTime.getTime(); } }); AllWindowedStream<Trip, TimeWindow> windowedUserTrips = featurizedUserTrips.timeWindowAll(Time.days(7), Time.days(1)); ``` It feels a bug to me, but I want to confirm it before I file the bug report. On 2019/04/18 03:38:34, Paul Lam <paullin3...@gmail.com> wrote: > Hi, > > Could you check the watermark of the window operator? One possible situation > would be some of the keys are not getting enough inputs, so their watermarks > remain below the window end time and hold the window operator watermark back. > IMO, it’s a good practice to assign watermark earlier in the data pipeline. > > Best, > Paul Lam > > > 在 2019年4月17日,23:04,an0...@gmail.com 写道: > > > > `assignTimestampsAndWatermarks` before `keyBy` works: > > ```java > > DataStream<Trip> trips = > > env.addSource(consumer).assignTimestampsAndWatermarks(new > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) { > > @Override > > public long extractTimestamp(Trip trip) { > > return trip.endTime.getTime(); > > } > > }); > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > > DataStream<FeaturizedTrip> featurizedUserTrips = userTrips.process(new > > Featurization()); > > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > > featurizedUserTrips.timeWindowAll(Time.days(7), > > Time.days(1)); > > ``` > > > > But not after `keyBy` and `process`: > > ```java > > DataStream<Trip> trips = env.addSource(consumer); > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId); > > DataStream<FeaturizedTrip> featurizedUserTrips = > > userTrips.process(new > > Featurization()).assignTimestampsAndWatermarks(new > > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) { > > @Override > > public long extractTimestamp(FeaturizedTrip trip) { > > return trip.endTime.getTime(); > > } > > }); > > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips = > > featurizedUserTrips.timeWindowAll(Time.days(7), > > Time.days(1)); > > ``` > > Windows are never triggered. > > > > Is it a bug or expected behavior? If the latter, where is it documented? > > > >