I don't think it is the watermark. I see the same watermarks from the two
versions of code.
The processing on the keyed stream doesn't change event time at all. I can
simply change my code to use `map` on the keyed stream to return back the input
data, so that the window operator receives the exactly same data. The only
difference is when I do `assignTimestampsAndWatermarks`. The result is the
same, `assignTimestampsAndWatermarks` before `keyBy` works:
```java
DataStream<Trip> trips =
env.addSource(consumer).assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
@Override
public long extractTimestamp(Trip trip) {
return trip.endTime.getTime();
}
});
KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip);
AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
featurizedUserTrips.timeWindowAll(Time.days(7),
Time.days(1));
```
`assignTimestampsAndWatermarks` after `keyBy` doesn't work:
```java
DataStream<Trip> trips = env.addSource(consumer);
KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
DataStream<Trip> featurizedUserTrips =
userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
@Override
public long extractTimestamp(Trip trip) {
return trip.endTime.getTime();
}
});
AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
featurizedUserTrips.timeWindowAll(Time.days(7),
Time.days(1));
```
It feels a bug to me, but I want to confirm it before I file the bug report.
On 2019/04/18 03:38:34, Paul Lam <[email protected]> wrote:
> Hi,
>
> Could you check the watermark of the window operator? One possible situation
> would be some of the keys are not getting enough inputs, so their watermarks
> remain below the window end time and hold the window operator watermark back.
> IMO, it’s a good practice to assign watermark earlier in the data pipeline.
>
> Best,
> Paul Lam
>
> > 在 2019年4月17日,23:04,[email protected] 写道:
> >
> > `assignTimestampsAndWatermarks` before `keyBy` works:
> > ```java
> > DataStream<Trip> trips =
> > env.addSource(consumer).assignTimestampsAndWatermarks(new
> > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> > @Override
> > public long extractTimestamp(Trip trip) {
> > return trip.endTime.getTime();
> > }
> > });
> > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> > DataStream<FeaturizedTrip> featurizedUserTrips = userTrips.process(new
> > Featurization());
> > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
> > featurizedUserTrips.timeWindowAll(Time.days(7),
> > Time.days(1));
> > ```
> >
> > But not after `keyBy` and `process`:
> > ```java
> > DataStream<Trip> trips = env.addSource(consumer);
> > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> > DataStream<FeaturizedTrip> featurizedUserTrips =
> > userTrips.process(new
> > Featurization()).assignTimestampsAndWatermarks(new
> > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) {
> > @Override
> > public long extractTimestamp(FeaturizedTrip trip) {
> > return trip.endTime.getTime();
> > }
> > });
> > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
> > featurizedUserTrips.timeWindowAll(Time.days(7),
> > Time.days(1));
> > ```
> > Windows are never triggered.
> >
> > Is it a bug or expected behavior? If the latter, where is it documented?
> >
>
>