I don't think it is the watermark. I see the same watermarks from the two 
versions of code.

The processing on the keyed stream doesn't change event time at all. I can 
simply change my code to use `map` on the keyed stream to return back the input 
data, so that the window operator receives the exactly same data. The only 
difference is when I do `assignTimestampsAndWatermarks`. The result is the 
same, `assignTimestampsAndWatermarks` before `keyBy` works:
```java
DataStream<Trip> trips =
        env.addSource(consumer).assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
    @Override
    public long extractTimestamp(Trip trip) {
        return trip.endTime.getTime();
    }
});
KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip);
AllWindowedStream<Trip, TimeWindow> windowedUserTrips = 
featurizedUserTrips.timeWindowAll(Time.days(7),
        Time.days(1));
```

`assignTimestampsAndWatermarks` after `keyBy` doesn't work:
```java
DataStream<Trip> trips = env.addSource(consumer);
KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
DataStream<Trip> featurizedUserTrips =
        userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
    @Override
    public long extractTimestamp(Trip trip) {
        return trip.endTime.getTime();
    }
});
AllWindowedStream<Trip, TimeWindow> windowedUserTrips = 
featurizedUserTrips.timeWindowAll(Time.days(7),
        Time.days(1));
```

It feels a bug to me, but I want to confirm it before I file the bug report.

On 2019/04/18 03:38:34, Paul Lam <paullin3...@gmail.com> wrote: 
> Hi,
> 
> Could you check the watermark of the window operator? One possible situation 
> would be some of the keys are not getting enough inputs, so their watermarks 
> remain below the window end time and hold the window operator watermark back. 
> IMO, it’s a good practice to assign watermark earlier in the data pipeline.
> 
> Best,
> Paul Lam
> 
> > 在 2019年4月17日,23:04,an0...@gmail.com 写道:
> > 
> > `assignTimestampsAndWatermarks` before `keyBy` works:
> > ```java
> > DataStream<Trip> trips =
> >        env.addSource(consumer).assignTimestampsAndWatermarks(new 
> > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> >            @Override
> >            public long extractTimestamp(Trip trip) {
> >                return trip.endTime.getTime();
> >            }
> >        });
> > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> > DataStream<FeaturizedTrip> featurizedUserTrips = userTrips.process(new 
> > Featurization());
> > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
> >        featurizedUserTrips.timeWindowAll(Time.days(7),
> >                Time.days(1));
> > ```
> > 
> > But not after `keyBy` and `process`:
> > ```java
> > DataStream<Trip> trips = env.addSource(consumer);
> > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> > DataStream<FeaturizedTrip> featurizedUserTrips =
> >        userTrips.process(new 
> > Featurization()).assignTimestampsAndWatermarks(new 
> > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) {
> >            @Override
> >            public long extractTimestamp(FeaturizedTrip trip) {
> >                return trip.endTime.getTime();
> >            }
> >        });
> > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
> >        featurizedUserTrips.timeWindowAll(Time.days(7),
> >                Time.days(1));
> > ```
> > Windows are never triggered.
> > 
> > Is it a bug or expected behavior? If the latter, where is it documented?
> > 
> 
> 

Reply via email to