`assignTimestampsAndWatermarks` before `keyBy` works:
```java
DataStream<Trip> trips =
        env.addSource(consumer).assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
            @Override
            public long extractTimestamp(Trip trip) {
                return trip.endTime.getTime();
            }
        });
KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
DataStream<FeaturizedTrip> featurizedUserTrips = userTrips.process(new 
Featurization());
AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
        featurizedUserTrips.timeWindowAll(Time.days(7),
                Time.days(1));
```

But not after `keyBy` and `process`:
```java
DataStream<Trip> trips = env.addSource(consumer);
KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
DataStream<FeaturizedTrip> featurizedUserTrips =
        userTrips.process(new 
Featurization()).assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) {
            @Override
            public long extractTimestamp(FeaturizedTrip trip) {
                return trip.endTime.getTime();
            }
        });
AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
        featurizedUserTrips.timeWindowAll(Time.days(7),
                Time.days(1));
```
Windows are never triggered.

Is it a bug or expected behavior? If the latter, where is it documented?

Reply via email to